Instrument chunks on level 1 compactions

This commit is contained in:
Fabian Reinartz 2017-09-01 16:10:10 +02:00
parent 893b6ec506
commit 9f41d9fd3c

View File

@ -70,6 +70,9 @@ type compactorMetrics struct {
ran prometheus.Counter
failed prometheus.Counter
duration prometheus.Histogram
chunkSize prometheus.Histogram
chunkSamples prometheus.Histogram
chunkRange prometheus.Histogram
}
func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
@ -83,9 +86,25 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Name: "tsdb_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.",
})
m.duration = prometheus.NewSummary(prometheus.SummaryOpts{
m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_duration",
Help: "Duration of compaction runs.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
})
m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_size",
Help: "Final size of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
})
m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_samples",
Help: "Final number of samples on their first compaction",
Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
})
m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_range",
Help: "Final time range of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(100, 4, 10),
})
if r != nil {
@ -93,6 +112,9 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
m.ran,
m.failed,
m.duration,
m.chunkRange,
m.chunkSamples,
m.chunkSize,
)
}
return m
@ -312,6 +334,25 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) e
return c.write(dest, meta, b)
}
// instrumentedChunkWriter is used for level 1 compactions to record statistics
// about compacted chunks.
type instrumentedChunkWriter struct {
ChunkWriter
size prometheus.Histogram
samples prometheus.Histogram
trange prometheus.Histogram
}
func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error {
for _, c := range chunks {
w.size.Observe(float64(len(c.Chunk.Bytes())))
w.samples.Observe(float64(c.Chunk.NumSamples()))
w.trange.Observe(float64(c.MaxTime - c.MinTime))
}
return w.ChunkWriter.WriteChunks(chunks...)
}
// write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully.
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
@ -338,10 +379,22 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// Populate chunk and index files into temporary directory with
// data of all blocks.
chunkw, err := newChunkWriter(chunkDir(tmp))
var chunkw ChunkWriter
chunkw, err = newChunkWriter(chunkDir(tmp))
if err != nil {
return errors.Wrap(err, "open chunk writer")
}
// Record written chunk sizes on level 1 compactions.
if meta.Compaction.Level == 1 {
chunkw = &instrumentedChunkWriter{
ChunkWriter: chunkw,
size: c.metrics.chunkSize,
samples: c.metrics.chunkSamples,
trange: c.metrics.chunkRange,
}
}
indexw, err := newIndexWriter(tmp)
if err != nil {
return errors.Wrap(err, "open index writer")