Merge pull request #210 from Gouthamve/tomb-clean
Add a function to cleanup tombstones.
This commit is contained in:
commit
30bbbe34f8
24
block.go
24
block.go
|
@ -347,6 +347,30 @@ Outer:
|
|||
return writeMetaFile(pb.dir, &pb.meta)
|
||||
}
|
||||
|
||||
// CleanTombstones will rewrite the block if there any tombstones to remove them
|
||||
// and returns if there was a re-write.
|
||||
func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
|
||||
numStones := 0
|
||||
|
||||
pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
|
||||
for _ = range ivs {
|
||||
numStones++
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if numStones == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if _, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Snapshot creates snapshot of the block into dir.
|
||||
func (pb *Block) Snapshot(dir string) error {
|
||||
blockDir := filepath.Join(dir, pb.meta.ULID.String())
|
||||
|
|
37
db.go
37
db.go
|
@ -122,6 +122,7 @@ type dbMetrics struct {
|
|||
reloads prometheus.Counter
|
||||
reloadsFailed prometheus.Counter
|
||||
compactionsTriggered prometheus.Counter
|
||||
tombCleanTimer prometheus.Histogram
|
||||
}
|
||||
|
||||
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
|
||||
|
@ -147,6 +148,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
|
|||
Name: "prometheus_tsdb_compactions_triggered_total",
|
||||
Help: "Total number of triggered compactions for the partition.",
|
||||
})
|
||||
m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "prometheus_tsdb_tombstone_cleanup_seconds",
|
||||
Help: "The time taken to recompact blocks to remove tombstones.",
|
||||
})
|
||||
|
||||
if r != nil {
|
||||
r.MustRegister(
|
||||
|
@ -154,6 +159,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
|
|||
m.reloads,
|
||||
m.reloadsFailed,
|
||||
m.compactionsTriggered,
|
||||
m.tombCleanTimer,
|
||||
)
|
||||
}
|
||||
return m
|
||||
|
@ -691,6 +697,37 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// CleanTombstones re-writes any blocks with tombstones.
|
||||
func (db *DB) CleanTombstones() error {
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
|
||||
start := time.Now()
|
||||
defer db.metrics.tombCleanTimer.Observe(float64(time.Since(start).Seconds()))
|
||||
|
||||
db.mtx.RLock()
|
||||
blocks := db.blocks[:]
|
||||
db.mtx.RUnlock()
|
||||
|
||||
deleted := []string{}
|
||||
for _, b := range blocks {
|
||||
ok, err := b.CleanTombstones(db.Dir(), db.compactor)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "clean tombstones: %s", b.Dir())
|
||||
}
|
||||
|
||||
if ok {
|
||||
deleted = append(deleted, b.Dir())
|
||||
}
|
||||
}
|
||||
|
||||
if len(deleted) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.Wrap(db.reload(deleted...), "reload blocks")
|
||||
}
|
||||
|
||||
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
||||
// Checks Overlap: http://stackoverflow.com/questions/3269434/
|
||||
return amin <= bmax && bmin <= amax
|
||||
|
|
91
db_test.go
91
db_test.go
|
@ -644,6 +644,97 @@ func TestWALFlushedOnDBClose(t *testing.T) {
|
|||
require.Equal(t, values, []string{"labelvalue"})
|
||||
}
|
||||
|
||||
func TestTombstoneClean(t *testing.T) {
|
||||
numSamples := int64(10)
|
||||
|
||||
db, close := openTestDB(t, nil)
|
||||
defer close()
|
||||
|
||||
app := db.Appender()
|
||||
|
||||
smpls := make([]float64, numSamples)
|
||||
for i := int64(0); i < numSamples; i++ {
|
||||
smpls[i] = rand.Float64()
|
||||
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
cases := []struct {
|
||||
intervals Intervals
|
||||
remaint []int64
|
||||
}{
|
||||
{
|
||||
intervals: Intervals{{1, 3}, {4, 7}},
|
||||
remaint: []int64{0, 8, 9},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
// Delete the ranges.
|
||||
|
||||
// create snapshot
|
||||
snap, err := ioutil.TempDir("", "snap")
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.Snapshot(snap))
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
// reopen DB from snapshot
|
||||
db, err = Open(snap, nil, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, r := range c.intervals {
|
||||
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b")))
|
||||
}
|
||||
|
||||
// All of the setup for THIS line.
|
||||
require.NoError(t, db.CleanTombstones())
|
||||
|
||||
// Compare the result.
|
||||
q, err := db.Querier(0, numSamples)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := q.Select(labels.NewEqualMatcher("a", "b"))
|
||||
require.NoError(t, err)
|
||||
|
||||
expSamples := make([]sample, 0, len(c.remaint))
|
||||
for _, ts := range c.remaint {
|
||||
expSamples = append(expSamples, sample{ts, smpls[ts]})
|
||||
}
|
||||
|
||||
expss := newListSeriesSet([]Series{
|
||||
newSeries(map[string]string{"a": "b"}, expSamples),
|
||||
})
|
||||
|
||||
if len(expSamples) == 0 {
|
||||
require.False(t, res.Next())
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
eok, rok := expss.Next(), res.Next()
|
||||
require.Equal(t, eok, rok, "next")
|
||||
|
||||
if !eok {
|
||||
break
|
||||
}
|
||||
sexp := expss.At()
|
||||
sres := res.At()
|
||||
|
||||
require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
|
||||
|
||||
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
|
||||
smplRes, errRes := expandSeriesIterator(sres.Iterator())
|
||||
|
||||
require.Equal(t, errExp, errRes, "samples error")
|
||||
require.Equal(t, smplExp, smplRes, "samples")
|
||||
}
|
||||
|
||||
for _, b := range db.blocks {
|
||||
Equals(t, emptyTombstoneReader, b.tombstones)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDB_Retention(t *testing.T) {
|
||||
tmpdir, _ := ioutil.TempDir("", "test")
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
|
Loading…
Reference in New Issue