mirror of
https://github.com/prometheus/prometheus
synced 2025-04-01 22:59:03 +00:00
Set the min time of Head properly after truncation (#8212)
* Set the min time of Head properly after truncation Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix lint Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Enhance compaction plan logic for completely deleted small block Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
parent
f50dde7031
commit
dff967286e
@ -214,6 +214,11 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
|
|||||||
for i := len(dms) - 1; i >= 0; i-- {
|
for i := len(dms) - 1; i >= 0; i-- {
|
||||||
meta := dms[i].meta
|
meta := dms[i].meta
|
||||||
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
|
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
|
||||||
|
// If the block is entirely deleted, then we don't care about the block being big enough.
|
||||||
|
// TODO: This is assuming single tombstone is for distinct series, which might be no true.
|
||||||
|
if meta.Stats.NumTombstones > 0 && meta.Stats.NumTombstones >= meta.Stats.NumSeries {
|
||||||
|
return []string{dms[i].dir}, nil
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
|
if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
|
||||||
|
53
tsdb/head.go
53
tsdb/head.go
@ -820,9 +820,22 @@ func (h *Head) truncateMemory(mint int64) (err error) {
|
|||||||
h.metrics.headTruncateTotal.Inc()
|
h.metrics.headTruncateTotal.Inc()
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
h.gc()
|
actualMint := h.gc()
|
||||||
level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start))
|
level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start))
|
||||||
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
|
||||||
|
if actualMint > h.minTime.Load() {
|
||||||
|
// The actual mint of the Head is higher than the one asked to truncate.
|
||||||
|
appendableMinValidTime := h.appendableMinValidTime()
|
||||||
|
if actualMint < appendableMinValidTime {
|
||||||
|
h.minTime.Store(actualMint)
|
||||||
|
h.minValidTime.Store(actualMint)
|
||||||
|
} else {
|
||||||
|
// The actual min time is in the appendable window.
|
||||||
|
// So we set the mint to the appendableMinValidTime.
|
||||||
|
h.minTime.Store(appendableMinValidTime)
|
||||||
|
h.minValidTime.Store(appendableMinValidTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Truncate the chunk m-mapper.
|
// Truncate the chunk m-mapper.
|
||||||
if err := h.chunkDiskMapper.Truncate(mint); err != nil {
|
if err := h.chunkDiskMapper.Truncate(mint); err != nil {
|
||||||
@ -1054,10 +1067,8 @@ func (h *Head) appender() *headAppender {
|
|||||||
cleanupAppendIDsBelow := h.iso.lowWatermark()
|
cleanupAppendIDsBelow := h.iso.lowWatermark()
|
||||||
|
|
||||||
return &headAppender{
|
return &headAppender{
|
||||||
head: h,
|
head: h,
|
||||||
// Set the minimum valid time to whichever is greater the head min valid time or the compaction window.
|
minValidTime: h.appendableMinValidTime(),
|
||||||
// This ensures that no samples will be added within the compaction window to avoid races.
|
|
||||||
minValidTime: max(h.minValidTime.Load(), h.MaxTime()-h.chunkRange.Load()/2),
|
|
||||||
mint: math.MaxInt64,
|
mint: math.MaxInt64,
|
||||||
maxt: math.MinInt64,
|
maxt: math.MinInt64,
|
||||||
samples: h.getAppendBuffer(),
|
samples: h.getAppendBuffer(),
|
||||||
@ -1067,6 +1078,12 @@ func (h *Head) appender() *headAppender {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Head) appendableMinValidTime() int64 {
|
||||||
|
// Setting the minimum valid time to whichever is greater, the head min valid time or the compaction window,
|
||||||
|
// ensures that no samples will be added within the compaction window to avoid races.
|
||||||
|
return max(h.minValidTime.Load(), h.MaxTime()-h.chunkRange.Load()/2)
|
||||||
|
}
|
||||||
|
|
||||||
func max(a, b int64) int64 {
|
func max(a, b int64) int64 {
|
||||||
if a > b {
|
if a > b {
|
||||||
return a
|
return a
|
||||||
@ -1335,13 +1352,14 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// gc removes data before the minimum timestamp from the head.
|
// gc removes data before the minimum timestamp from the head.
|
||||||
func (h *Head) gc() {
|
// It returns the actual min times of the chunks present in the Head.
|
||||||
|
func (h *Head) gc() int64 {
|
||||||
// Only data strictly lower than this timestamp must be deleted.
|
// Only data strictly lower than this timestamp must be deleted.
|
||||||
mint := h.MinTime()
|
mint := h.MinTime()
|
||||||
|
|
||||||
// Drop old chunks and remember series IDs and hashes if they can be
|
// Drop old chunks and remember series IDs and hashes if they can be
|
||||||
// deleted entirely.
|
// deleted entirely.
|
||||||
deleted, chunksRemoved := h.series.gc(mint)
|
deleted, chunksRemoved, actualMint := h.series.gc(mint)
|
||||||
seriesRemoved := len(deleted)
|
seriesRemoved := len(deleted)
|
||||||
|
|
||||||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||||
@ -1382,6 +1400,8 @@ func (h *Head) gc() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
h.symbols = symbols
|
h.symbols = symbols
|
||||||
|
|
||||||
|
return actualMint
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tombstones returns a new reader over the head's tombstones
|
// Tombstones returns a new reader over the head's tombstones
|
||||||
@ -1813,11 +1833,12 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
|
|||||||
|
|
||||||
// gc garbage collects old chunks that are strictly before mint and removes
|
// gc garbage collects old chunks that are strictly before mint and removes
|
||||||
// series entirely that have no chunks left.
|
// series entirely that have no chunks left.
|
||||||
func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
|
func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int, int64) {
|
||||||
var (
|
var (
|
||||||
deleted = map[uint64]struct{}{}
|
deleted = map[uint64]struct{}{}
|
||||||
deletedForCallback = []labels.Labels{}
|
deletedForCallback = []labels.Labels{}
|
||||||
rmChunks = 0
|
rmChunks = 0
|
||||||
|
actualMint int64 = math.MaxInt64
|
||||||
)
|
)
|
||||||
// Run through all series and truncate old chunks. Mark those with no
|
// Run through all series and truncate old chunks. Mark those with no
|
||||||
// chunks left as deleted and store their ID.
|
// chunks left as deleted and store their ID.
|
||||||
@ -1830,6 +1851,10 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
|
|||||||
rmChunks += series.truncateChunksBefore(mint)
|
rmChunks += series.truncateChunksBefore(mint)
|
||||||
|
|
||||||
if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit {
|
if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit {
|
||||||
|
seriesMint := series.minTime()
|
||||||
|
if seriesMint < actualMint {
|
||||||
|
actualMint = seriesMint
|
||||||
|
}
|
||||||
series.Unlock()
|
series.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -1864,7 +1889,11 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
|
|||||||
deletedForCallback = deletedForCallback[:0]
|
deletedForCallback = deletedForCallback[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
return deleted, rmChunks
|
if actualMint == math.MaxInt64 {
|
||||||
|
actualMint = mint
|
||||||
|
}
|
||||||
|
|
||||||
|
return deleted, rmChunks, actualMint
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stripeSeries) getByID(id uint64) *memSeries {
|
func (s *stripeSeries) getByID(id uint64) *memSeries {
|
||||||
|
@ -1897,3 +1897,37 @@ func TestErrReuseAppender(t *testing.T) {
|
|||||||
require.Error(t, app.Commit())
|
require.Error(t, app.Commit())
|
||||||
require.Error(t, app.Rollback())
|
require.Error(t, app.Rollback())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHeadMintAfterTruncation(t *testing.T) {
|
||||||
|
chunkRange := int64(2000)
|
||||||
|
head, _ := newTestHead(t, chunkRange, false)
|
||||||
|
|
||||||
|
app := head.Appender(context.Background())
|
||||||
|
_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 100, 100)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 4000, 200)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 8000, 300)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
// Truncating outside the appendable window and actual mint being outside
|
||||||
|
// appendable window should leave mint at the actual mint.
|
||||||
|
require.NoError(t, head.Truncate(3500))
|
||||||
|
require.Equal(t, int64(4000), head.MinTime())
|
||||||
|
require.Equal(t, int64(4000), head.minValidTime.Load())
|
||||||
|
|
||||||
|
// After truncation outside the appendable windown if the actual min time
|
||||||
|
// is in the appendable window then we should leave mint at the start of appendable window.
|
||||||
|
require.NoError(t, head.Truncate(5000))
|
||||||
|
require.Equal(t, head.appendableMinValidTime(), head.MinTime())
|
||||||
|
require.Equal(t, head.appendableMinValidTime(), head.minValidTime.Load())
|
||||||
|
|
||||||
|
// If the truncation time is inside the appendable window, then the min time
|
||||||
|
// should be the truncation time.
|
||||||
|
require.NoError(t, head.Truncate(7500))
|
||||||
|
require.Equal(t, int64(7500), head.MinTime())
|
||||||
|
require.Equal(t, int64(7500), head.minValidTime.Load())
|
||||||
|
|
||||||
|
require.NoError(t, head.Close())
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user