mirror of
https://github.com/prometheus/prometheus
synced 2025-01-26 01:13:33 +00:00
wal: Inject LiveReader metrics rather than registry
LiveReaders are instantiated `number of remote write queues * segments` times, which would cause double registration of the metrics. Instead this should be orchestrated by the layer above, instantiating the live reader. Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>
This commit is contained in:
parent
7ef10186b4
commit
078895bf56
@ -1,5 +1,9 @@
|
|||||||
## Master / unreleased
|
## Master / unreleased
|
||||||
|
|
||||||
|
## 0.9.1
|
||||||
|
|
||||||
|
- [CHANGE] LiveReader metrics are now injected rather than global.
|
||||||
|
|
||||||
## 0.9.0
|
## 0.9.0
|
||||||
|
|
||||||
- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)
|
- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)
|
||||||
@ -9,7 +13,7 @@
|
|||||||
- [BUGFIX] `prometheus_tsdb_compactions_failed_total` is now incremented on any compaction failure.
|
- [BUGFIX] `prometheus_tsdb_compactions_failed_total` is now incremented on any compaction failure.
|
||||||
- [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before.
|
- [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before.
|
||||||
- [CHANGE] Create new clean segment when starting the WAL.
|
- [CHANGE] Create new clean segment when starting the WAL.
|
||||||
- [CHANGE] Renamed metric from `prometheus_tsdb_wal_reader_corruption_errors` to `prometheus_tsdb_wal_reader_corruption_errors_total`
|
- [CHANGE] Renamed metric from `prometheus_tsdb_wal_reader_corruption_errors` to `prometheus_tsdb_wal_reader_corruption_errors_total`.
|
||||||
- [ENHANCEMENT] Improved atomicity of .tmp block replacement during compaction for usual case.
|
- [ENHANCEMENT] Improved atomicity of .tmp block replacement during compaction for usual case.
|
||||||
- [ENHANCEMENT] Improved postings intersection matching.
|
- [ENHANCEMENT] Improved postings intersection matching.
|
||||||
- [ENHANCEMENT] Reduced disk usage for WAL for small setups.
|
- [ENHANCEMENT] Reduced disk usage for WAL for small setups.
|
||||||
|
@ -27,26 +27,40 @@ import (
|
|||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// liveReaderMetrics holds all metrics exposed by the LiveReader.
|
||||||
|
type liveReaderMetrics struct {
|
||||||
|
readerCorruptionErrors *prometheus.CounterVec
|
||||||
|
}
|
||||||
|
|
||||||
|
// LiveReaderMetrics instatiates, registers and returns metrics to be injected
|
||||||
|
// at LiveReader instantiation.
|
||||||
|
func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics {
|
||||||
|
m := &liveReaderMetrics{
|
||||||
|
readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: "prometheus_tsdb_wal_reader_corruption_errors_total",
|
||||||
|
Help: "Errors encountered when reading the WAL.",
|
||||||
|
}, []string{"error"}),
|
||||||
|
}
|
||||||
|
|
||||||
|
if reg != nil {
|
||||||
|
reg.Register(m.readerCorruptionErrors)
|
||||||
|
}
|
||||||
|
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
// NewLiveReader returns a new live reader.
|
// NewLiveReader returns a new live reader.
|
||||||
func NewLiveReader(logger log.Logger, reg prometheus.Registerer, r io.Reader) *LiveReader {
|
func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader {
|
||||||
lr := &LiveReader{
|
lr := &LiveReader{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
rdr: r,
|
rdr: r,
|
||||||
|
metrics: metrics,
|
||||||
|
|
||||||
// Until we understand how they come about, make readers permissive
|
// Until we understand how they come about, make readers permissive
|
||||||
// to records spanning pages.
|
// to records spanning pages.
|
||||||
permissive: true,
|
permissive: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
lr.readerCorruptionErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
|
|
||||||
Name: "prometheus_tsdb_wal_reader_corruption_errors_total",
|
|
||||||
Help: "Errors encountered when reading the WAL.",
|
|
||||||
}, []string{"error"})
|
|
||||||
|
|
||||||
if reg != nil {
|
|
||||||
reg.MustRegister(lr.readerCorruptionErrors)
|
|
||||||
}
|
|
||||||
|
|
||||||
return lr
|
return lr
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,7 +88,7 @@ type LiveReader struct {
|
|||||||
// NB the non-ive Reader implementation allows for this.
|
// NB the non-ive Reader implementation allows for this.
|
||||||
permissive bool
|
permissive bool
|
||||||
|
|
||||||
readerCorruptionErrors *prometheus.CounterVec
|
metrics *liveReaderMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
// Err returns any errors encountered reading the WAL. io.EOFs are not terminal
|
// Err returns any errors encountered reading the WAL. io.EOFs are not terminal
|
||||||
@ -282,7 +296,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) {
|
|||||||
if !r.permissive {
|
if !r.permissive {
|
||||||
return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize)
|
return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize)
|
||||||
}
|
}
|
||||||
r.readerCorruptionErrors.WithLabelValues("record_span_page").Inc()
|
r.metrics.readerCorruptionErrors.WithLabelValues("record_span_page").Inc()
|
||||||
level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize)
|
level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize)
|
||||||
}
|
}
|
||||||
if recordHeaderSize+length > pageSize {
|
if recordHeaderSize+length > pageSize {
|
||||||
|
@ -51,7 +51,7 @@ var readerConstructors = map[string]func(io.Reader) reader{
|
|||||||
return NewReader(r)
|
return NewReader(r)
|
||||||
},
|
},
|
||||||
"LiveReader": func(r io.Reader) reader {
|
"LiveReader": func(r io.Reader) reader {
|
||||||
lr := NewLiveReader(log.NewNopLogger(), nil, r)
|
lr := NewLiveReader(log.NewNopLogger(), NewLiveReaderMetrics(nil), r)
|
||||||
lr.eofNonErr = true
|
lr.eofNonErr = true
|
||||||
return lr
|
return lr
|
||||||
},
|
},
|
||||||
@ -216,7 +216,7 @@ func TestReader_Live(t *testing.T) {
|
|||||||
// Read from a second FD on the same file.
|
// Read from a second FD on the same file.
|
||||||
readFd, err := os.Open(writeFd.Name())
|
readFd, err := os.Open(writeFd.Name())
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
reader := NewLiveReader(logger, nil, readFd)
|
reader := NewLiveReader(logger, NewLiveReaderMetrics(nil), readFd)
|
||||||
for _, exp := range testReaderCases[i].exp {
|
for _, exp := range testReaderCases[i].exp {
|
||||||
for !reader.Next() {
|
for !reader.Next() {
|
||||||
testutil.Assert(t, reader.Err() == io.EOF, "expect EOF, got: %v", reader.Err())
|
testutil.Assert(t, reader.Err() == io.EOF, "expect EOF, got: %v", reader.Err())
|
||||||
@ -518,7 +518,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
|
|||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer seg.Close()
|
defer seg.Close()
|
||||||
|
|
||||||
r := NewLiveReader(logger, nil, seg)
|
r := NewLiveReader(logger, NewLiveReaderMetrics(nil), seg)
|
||||||
testutil.Assert(t, r.Next() == false, "expected no records")
|
testutil.Assert(t, r.Next() == false, "expected no records")
|
||||||
testutil.Assert(t, r.Err().Error() == "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err())
|
testutil.Assert(t, r.Err().Error() == "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user