diff --git a/wal/live_reader.go b/wal/live_reader.go index 8394bfd08..f85978ba1 100644 --- a/wal/live_reader.go +++ b/wal/live_reader.go @@ -24,19 +24,11 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -var ( - readerCorruptionErrors = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_reader_corruption_errors", - Help: "Errors encountered when reading the WAL.", - }, []string{"error"}) ) // NewLiveReader returns a new live reader. -func NewLiveReader(logger log.Logger, r io.Reader) *LiveReader { - return &LiveReader{ +func NewLiveReader(logger log.Logger, reg prometheus.Registerer, r io.Reader) *LiveReader { + lr := &LiveReader{ logger: logger, rdr: r, @@ -44,6 +36,17 @@ func NewLiveReader(logger log.Logger, r io.Reader) *LiveReader { // to records spanning pages. permissive: true, } + + lr.readerCorruptionErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_reader_corruption_errors", + Help: "Errors encountered when reading the WAL.", + }, []string{"error"}) + + if reg != nil { + reg.MustRegister(lr.readerCorruptionErrors) + } + + return lr } // LiveReader reads WAL records from an io.Reader. It allows reading of WALs @@ -68,6 +71,8 @@ type LiveReader struct { // does. Until we track down why, set permissive to true to tolerate it. // NB the non-ive Reader implementation allows for this. permissive bool + + readerCorruptionErrors *prometheus.CounterVec } // Err returns any errors encountered reading the WAL. io.EOFs are not terminal @@ -258,7 +263,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) { if !r.permissive { return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize) } - readerCorruptionErrors.WithLabelValues("record_span_page").Inc() + r.readerCorruptionErrors.WithLabelValues("record_span_page").Inc() level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize) } if recordHeaderSize+length > pageSize { diff --git a/wal/reader_test.go b/wal/reader_test.go index 1178aa5ef..1a21d7bdb 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -51,7 +51,7 @@ var readerConstructors = map[string]func(io.Reader) reader{ return NewReader(r) }, "LiveReader": func(r io.Reader) reader { - lr := NewLiveReader(log.NewNopLogger(), r) + lr := NewLiveReader(log.NewNopLogger(), nil, r) lr.eofNonErr = true return lr }, @@ -216,7 +216,7 @@ func TestReader_Live(t *testing.T) { // Read from a second FD on the same file. readFd, err := os.Open(writeFd.Name()) testutil.Ok(t, err) - reader := NewLiveReader(logger, readFd) + reader := NewLiveReader(logger, nil, readFd) for _, exp := range testReaderCases[i].exp { for !reader.Next() { testutil.Assert(t, reader.Err() == io.EOF, "expect EOF, got: %v", reader.Err()) @@ -374,7 +374,7 @@ func TestReaderFuzz_Live(t *testing.T) { testutil.Ok(t, err) defer seg.Close() - r := NewLiveReader(logger, seg) + r := NewLiveReader(logger, nil, seg) segmentTicker := time.NewTicker(100 * time.Millisecond) readTicker := time.NewTicker(10 * time.Millisecond) @@ -410,7 +410,7 @@ outer: seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) testutil.Ok(t, err) defer seg.Close() - r = NewLiveReader(logger, seg) + r = NewLiveReader(logger, nil, seg) case <-readTicker.C: readSegment(r) @@ -464,7 +464,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) { testutil.Ok(t, err) defer seg.Close() - r := NewLiveReader(logger, seg) + r := NewLiveReader(logger, nil, seg) testutil.Assert(t, r.Next() == false, "expected no records") testutil.Assert(t, r.Err() == io.EOF, "expected error, got: %v", r.Err()) } @@ -512,7 +512,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { testutil.Ok(t, err) defer seg.Close() - r := NewLiveReader(logger, seg) + r := NewLiveReader(logger, nil, seg) testutil.Assert(t, r.Next() == false, "expected no records") testutil.Assert(t, r.Err().Error() == "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err()) }