diff --git a/CHANGELOG.md b/CHANGELOG.md index c84792a1e..9d057a3bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## Master / unreleased +## 0.9.1 + + - [CHANGE] LiveReader metrics are now injected rather than global. + ## 0.9.0 - [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609) @@ -9,7 +13,7 @@ - [BUGFIX] `prometheus_tsdb_compactions_failed_total` is now incremented on any compaction failure. - [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before. - [CHANGE] Create new clean segment when starting the WAL. - - [CHANGE] Renamed metric from `prometheus_tsdb_wal_reader_corruption_errors` to `prometheus_tsdb_wal_reader_corruption_errors_total` + - [CHANGE] Renamed metric from `prometheus_tsdb_wal_reader_corruption_errors` to `prometheus_tsdb_wal_reader_corruption_errors_total`. - [ENHANCEMENT] Improved atomicity of .tmp block replacement during compaction for usual case. - [ENHANCEMENT] Improved postings intersection matching. - [ENHANCEMENT] Reduced disk usage for WAL for small setups. diff --git a/wal/live_reader.go b/wal/live_reader.go index fb0485230..94175e791 100644 --- a/wal/live_reader.go +++ b/wal/live_reader.go @@ -27,26 +27,40 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// liveReaderMetrics holds all metrics exposed by the LiveReader. +type liveReaderMetrics struct { + readerCorruptionErrors *prometheus.CounterVec +} + +// LiveReaderMetrics instatiates, registers and returns metrics to be injected +// at LiveReader instantiation. +func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics { + m := &liveReaderMetrics{ + readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_reader_corruption_errors_total", + Help: "Errors encountered when reading the WAL.", + }, []string{"error"}), + } + + if reg != nil { + reg.Register(m.readerCorruptionErrors) + } + + return m +} + // NewLiveReader returns a new live reader. -func NewLiveReader(logger log.Logger, reg prometheus.Registerer, r io.Reader) *LiveReader { +func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader { lr := &LiveReader{ - logger: logger, - rdr: r, + logger: logger, + rdr: r, + metrics: metrics, // Until we understand how they come about, make readers permissive // to records spanning pages. permissive: true, } - lr.readerCorruptionErrors = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_reader_corruption_errors_total", - Help: "Errors encountered when reading the WAL.", - }, []string{"error"}) - - if reg != nil { - reg.MustRegister(lr.readerCorruptionErrors) - } - return lr } @@ -74,7 +88,7 @@ type LiveReader struct { // NB the non-ive Reader implementation allows for this. permissive bool - readerCorruptionErrors *prometheus.CounterVec + metrics *liveReaderMetrics } // Err returns any errors encountered reading the WAL. io.EOFs are not terminal @@ -282,7 +296,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) { if !r.permissive { return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize) } - r.readerCorruptionErrors.WithLabelValues("record_span_page").Inc() + r.metrics.readerCorruptionErrors.WithLabelValues("record_span_page").Inc() level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize) } if recordHeaderSize+length > pageSize { diff --git a/wal/reader_test.go b/wal/reader_test.go index 1e15cae84..96d152254 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -51,7 +51,7 @@ var readerConstructors = map[string]func(io.Reader) reader{ return NewReader(r) }, "LiveReader": func(r io.Reader) reader { - lr := NewLiveReader(log.NewNopLogger(), nil, r) + lr := NewLiveReader(log.NewNopLogger(), NewLiveReaderMetrics(nil), r) lr.eofNonErr = true return lr }, @@ -216,7 +216,7 @@ func TestReader_Live(t *testing.T) { // Read from a second FD on the same file. readFd, err := os.Open(writeFd.Name()) testutil.Ok(t, err) - reader := NewLiveReader(logger, nil, readFd) + reader := NewLiveReader(logger, NewLiveReaderMetrics(nil), readFd) for _, exp := range testReaderCases[i].exp { for !reader.Next() { testutil.Assert(t, reader.Err() == io.EOF, "expect EOF, got: %v", reader.Err()) @@ -518,7 +518,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { testutil.Ok(t, err) defer seg.Close() - r := NewLiveReader(logger, nil, seg) + r := NewLiveReader(logger, NewLiveReaderMetrics(nil), seg) testutil.Assert(t, r.Next() == false, "expected no records") testutil.Assert(t, r.Err().Error() == "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err()) }