diff --git a/block.go b/block.go index 034f33673..36db47680 100644 --- a/block.go +++ b/block.go @@ -335,6 +335,20 @@ Outer: return writeMetaFile(pb.dir, &pb.meta) } +// CleanTombstones will rewrite the block if there any tombstones to remove them +// and returns if there was a re-write. +func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) { + if len(pb.tombstones) == 0 { + return false, nil + } + + if err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil { + return false, err + } + + return true, nil +} + // Snapshot creates snapshot of the block into dir. func (pb *Block) Snapshot(dir string) error { blockDir := filepath.Join(dir, pb.meta.ULID.String()) diff --git a/db.go b/db.go index f1972f00b..47afb4c13 100644 --- a/db.go +++ b/db.go @@ -122,6 +122,7 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + tombCleanTimer prometheus.Histogram } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -147,6 +148,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "prometheus_tsdb_tombstone_cleanup_seconds", + Help: "The time taken to recompact blocks to remove tombstones.", + }) if r != nil { r.MustRegister( @@ -154,6 +159,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.reloads, m.reloadsFailed, m.compactionsTriggered, + m.tombCleanTimer, ) } return m @@ -686,6 +692,37 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { return nil } +// CleanTombstones re-writes any blocks with tombstones. +func (db *DB) CleanTombstones() error { + db.cmtx.Lock() + defer db.cmtx.Unlock() + + start := time.Now() + defer db.metrics.tombCleanTimer.Observe(float64(time.Since(start).Seconds())) + + db.mtx.RLock() + blocks := db.blocks[:] + db.mtx.RUnlock() + + deleted := []string{} + for _, b := range blocks { + ok, err := b.CleanTombstones(db.Dir(), db.compactor) + if err != nil { + return errors.Wrapf(err, "clean tombstones: %s", b.Dir()) + } + + if ok { + deleted = append(deleted, b.Dir()) + } + } + + if len(deleted) == 0 { + return nil + } + + return errors.Wrap(db.reload(deleted...), "reload blocks") +} + func intervalOverlap(amin, amax, bmin, bmax int64) bool { // Checks Overlap: http://stackoverflow.com/questions/3269434/ return amin <= bmax && bmin <= amax diff --git a/db_test.go b/db_test.go index 9f91705e7..b4bb0c9bc 100644 --- a/db_test.go +++ b/db_test.go @@ -551,3 +551,93 @@ func TestWALFlushedOnDBClose(t *testing.T) { require.NoError(t, err) require.Equal(t, values, []string{"labelvalue"}) } + +func TestTombstoneClean(t *testing.T) { + numSamples := int64(10) + + db, close := openTestDB(t, nil) + defer close() + + app := db.Appender() + + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + } + + require.NoError(t, app.Commit()) + cases := []struct { + intervals Intervals + remaint []int64 + }{ + { + intervals: Intervals{{1, 3}, {4, 7}}, + remaint: []int64{0, 8, 9}, + }, + } + + for _, c := range cases { + // Delete the ranges. + + // create snapshot + snap, err := ioutil.TempDir("", "snap") + require.NoError(t, err) + require.NoError(t, db.Snapshot(snap)) + require.NoError(t, db.Close()) + + // reopen DB from snapshot + db, err = Open(snap, nil, nil, nil) + require.NoError(t, err) + + for _, r := range c.intervals { + require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) + } + + // All of the setup for THIS line. + require.NoError(t, db.CleanTombstones()) + + // Compare the result. + q, err := db.Querier(0, numSamples) + require.NoError(t, err) + + res := q.Select(labels.NewEqualMatcher("a", "b")) + + expSamples := make([]sample, 0, len(c.remaint)) + for _, ts := range c.remaint { + expSamples = append(expSamples, sample{ts, smpls[ts]}) + } + + expss := newListSeriesSet([]Series{ + newSeries(map[string]string{"a": "b"}, expSamples), + }) + + if len(expSamples) == 0 { + require.False(t, res.Next()) + continue + } + + for { + eok, rok := expss.Next(), res.Next() + require.Equal(t, eok, rok, "next") + + if !eok { + break + } + sexp := expss.At() + sres := res.At() + + require.Equal(t, sexp.Labels(), sres.Labels(), "labels") + + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + + require.Equal(t, errExp, errRes, "samples error") + require.Equal(t, smplExp, smplRes, "samples") + } + + for _, b := range db.blocks { + Equals(t, 0, len(b.tombstones)) + } + } +}