wal: Replace promauto with injected registry

Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>
This commit is contained in:
Frederic Branczyk 2019-06-03 12:27:30 +02:00
parent 882162d5b9
commit abd15845e2
No known key found for this signature in database
GPG Key ID: 7741A52782A90069
2 changed files with 22 additions and 17 deletions

View File

@ -24,19 +24,11 @@ import (
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
readerCorruptionErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_reader_corruption_errors",
Help: "Errors encountered when reading the WAL.",
}, []string{"error"})
) )
// NewLiveReader returns a new live reader. // NewLiveReader returns a new live reader.
func NewLiveReader(logger log.Logger, r io.Reader) *LiveReader { func NewLiveReader(logger log.Logger, reg prometheus.Registerer, r io.Reader) *LiveReader {
return &LiveReader{ lr := &LiveReader{
logger: logger, logger: logger,
rdr: r, rdr: r,
@ -44,6 +36,17 @@ func NewLiveReader(logger log.Logger, r io.Reader) *LiveReader {
// to records spanning pages. // to records spanning pages.
permissive: true, permissive: true,
} }
lr.readerCorruptionErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_reader_corruption_errors",
Help: "Errors encountered when reading the WAL.",
}, []string{"error"})
if reg != nil {
reg.MustRegister(lr.readerCorruptionErrors)
}
return lr
} }
// LiveReader reads WAL records from an io.Reader. It allows reading of WALs // LiveReader reads WAL records from an io.Reader. It allows reading of WALs
@ -68,6 +71,8 @@ type LiveReader struct {
// does. Until we track down why, set permissive to true to tolerate it. // does. Until we track down why, set permissive to true to tolerate it.
// NB the non-ive Reader implementation allows for this. // NB the non-ive Reader implementation allows for this.
permissive bool permissive bool
readerCorruptionErrors *prometheus.CounterVec
} }
// Err returns any errors encountered reading the WAL. io.EOFs are not terminal // Err returns any errors encountered reading the WAL. io.EOFs are not terminal
@ -258,7 +263,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) {
if !r.permissive { if !r.permissive {
return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize) return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize)
} }
readerCorruptionErrors.WithLabelValues("record_span_page").Inc() r.readerCorruptionErrors.WithLabelValues("record_span_page").Inc()
level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize) level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize)
} }
if recordHeaderSize+length > pageSize { if recordHeaderSize+length > pageSize {

View File

@ -51,7 +51,7 @@ var readerConstructors = map[string]func(io.Reader) reader{
return NewReader(r) return NewReader(r)
}, },
"LiveReader": func(r io.Reader) reader { "LiveReader": func(r io.Reader) reader {
lr := NewLiveReader(log.NewNopLogger(), r) lr := NewLiveReader(log.NewNopLogger(), nil, r)
lr.eofNonErr = true lr.eofNonErr = true
return lr return lr
}, },
@ -216,7 +216,7 @@ func TestReader_Live(t *testing.T) {
// Read from a second FD on the same file. // Read from a second FD on the same file.
readFd, err := os.Open(writeFd.Name()) readFd, err := os.Open(writeFd.Name())
testutil.Ok(t, err) testutil.Ok(t, err)
reader := NewLiveReader(logger, readFd) reader := NewLiveReader(logger, nil, readFd)
for _, exp := range testReaderCases[i].exp { for _, exp := range testReaderCases[i].exp {
for !reader.Next() { for !reader.Next() {
testutil.Assert(t, reader.Err() == io.EOF, "expect EOF, got: %v", reader.Err()) testutil.Assert(t, reader.Err() == io.EOF, "expect EOF, got: %v", reader.Err())
@ -374,7 +374,7 @@ func TestReaderFuzz_Live(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer seg.Close() defer seg.Close()
r := NewLiveReader(logger, seg) r := NewLiveReader(logger, nil, seg)
segmentTicker := time.NewTicker(100 * time.Millisecond) segmentTicker := time.NewTicker(100 * time.Millisecond)
readTicker := time.NewTicker(10 * time.Millisecond) readTicker := time.NewTicker(10 * time.Millisecond)
@ -410,7 +410,7 @@ outer:
seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) seg, err = OpenReadSegment(SegmentName(dir, seg.i+1))
testutil.Ok(t, err) testutil.Ok(t, err)
defer seg.Close() defer seg.Close()
r = NewLiveReader(logger, seg) r = NewLiveReader(logger, nil, seg)
case <-readTicker.C: case <-readTicker.C:
readSegment(r) readSegment(r)
@ -464,7 +464,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer seg.Close() defer seg.Close()
r := NewLiveReader(logger, seg) r := NewLiveReader(logger, nil, seg)
testutil.Assert(t, r.Next() == false, "expected no records") testutil.Assert(t, r.Next() == false, "expected no records")
testutil.Assert(t, r.Err() == io.EOF, "expected error, got: %v", r.Err()) testutil.Assert(t, r.Err() == io.EOF, "expected error, got: %v", r.Err())
} }
@ -512,7 +512,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer seg.Close() defer seg.Close()
r := NewLiveReader(logger, seg) r := NewLiveReader(logger, nil, seg)
testutil.Assert(t, r.Next() == false, "expected no records") testutil.Assert(t, r.Next() == false, "expected no records")
testutil.Assert(t, r.Err().Error() == "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err()) testutil.Assert(t, r.Err().Error() == "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err())
} }