mirror of
https://github.com/prometheus/prometheus
synced 2024-12-26 00:23:18 +00:00
Call delete on head if interval overlaps (#9151)
* Call delete on head if interval overlaps Signed-off-by: darshanime <deathbullet@gmail.com> * Garbage collect tombstones during head gc Signed-off-by: darshanime <deathbullet@gmail.com> * Truncate tombstones before min time during head gc Signed-off-by: darshanime <deathbullet@gmail.com> * Lock less by deleting all keys in a single pass Signed-off-by: darshanime <deathbullet@gmail.com> * Pass map to DeleteTombstones Signed-off-by: darshanime <deathbullet@gmail.com> * Create new slice to replace old one Signed-off-by: darshanime <deathbullet@gmail.com>
This commit is contained in:
parent
bd217c58a7
commit
1f688657bf
@ -1663,9 +1663,12 @@ func (db *DB) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
|
||||
}(b))
|
||||
}
|
||||
}
|
||||
g.Go(func() error {
|
||||
return db.head.Delete(mint, maxt, ms...)
|
||||
})
|
||||
if db.head.OverlapsClosedInterval(mint, maxt) {
|
||||
g.Go(func() error {
|
||||
return db.head.Delete(mint, maxt, ms...)
|
||||
})
|
||||
}
|
||||
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
|
@ -736,6 +736,11 @@ func (h *Head) Truncate(mint int64) (err error) {
|
||||
return h.truncateWAL(mint)
|
||||
}
|
||||
|
||||
// OverlapsClosedInterval returns true if the head overlaps [mint, maxt].
|
||||
func (h *Head) OverlapsClosedInterval(mint, maxt int64) bool {
|
||||
return h.MinTime() <= maxt && mint <= h.MaxTime()
|
||||
}
|
||||
|
||||
// truncateMemory removes old data before mint from the head.
|
||||
func (h *Head) truncateMemory(mint int64) (err error) {
|
||||
h.chunkSnapshotMtx.Lock()
|
||||
@ -1101,6 +1106,10 @@ func (h *Head) gc() int64 {
|
||||
// Remove deleted series IDs from the postings lists.
|
||||
h.postings.Delete(deleted)
|
||||
|
||||
// Remove tombstones referring to the deleted series.
|
||||
h.tombstones.DeleteTombstones(deleted)
|
||||
h.tombstones.TruncateBefore(mint)
|
||||
|
||||
if h.wal != nil {
|
||||
_, last, _ := wal.Segments(h.wal.Dir())
|
||||
h.deletedMtx.Lock()
|
||||
|
@ -252,6 +252,34 @@ func (t *MemTombstones) Get(ref uint64) (Intervals, error) {
|
||||
return t.intvlGroups[ref], nil
|
||||
}
|
||||
|
||||
func (t *MemTombstones) DeleteTombstones(refs map[uint64]struct{}) {
|
||||
t.mtx.Lock()
|
||||
defer t.mtx.Unlock()
|
||||
for ref := range refs {
|
||||
delete(t.intvlGroups, ref)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *MemTombstones) TruncateBefore(beforeT int64) {
|
||||
t.mtx.Lock()
|
||||
defer t.mtx.Unlock()
|
||||
for ref, ivs := range t.intvlGroups {
|
||||
i := len(ivs) - 1
|
||||
for ; i >= 0; i-- {
|
||||
if beforeT > ivs[i].Maxt {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(ivs[i+1:]) == 0 {
|
||||
delete(t.intvlGroups, ref)
|
||||
} else {
|
||||
newIvs := make(Intervals, len(ivs[i+1:]))
|
||||
copy(newIvs, ivs[i+1:])
|
||||
t.intvlGroups[ref] = newIvs
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *MemTombstones) Iter(f func(uint64, Intervals) error) error {
|
||||
t.mtx.RLock()
|
||||
defer t.mtx.RUnlock()
|
||||
|
@ -63,6 +63,66 @@ func TestWriteAndReadbackTombstones(t *testing.T) {
|
||||
require.Equal(t, stones, restr)
|
||||
}
|
||||
|
||||
func TestDeletingTombstones(t *testing.T) {
|
||||
stones := NewMemTombstones()
|
||||
|
||||
ref := uint64(42)
|
||||
mint := rand.Int63n(time.Now().UnixNano())
|
||||
dranges := make(Intervals, 0, 1)
|
||||
dranges = dranges.Add(Interval{mint, mint + rand.Int63n(1000)})
|
||||
stones.AddInterval(ref, dranges...)
|
||||
stones.AddInterval(uint64(43), dranges...)
|
||||
|
||||
intervals, err := stones.Get(ref)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, intervals, dranges)
|
||||
|
||||
stones.DeleteTombstones(map[uint64]struct{}{ref: struct{}{}})
|
||||
|
||||
intervals, err = stones.Get(ref)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, intervals)
|
||||
}
|
||||
|
||||
func TestTruncateBefore(t *testing.T) {
|
||||
cases := []struct {
|
||||
before Intervals
|
||||
beforeT int64
|
||||
after Intervals
|
||||
}{
|
||||
{
|
||||
before: Intervals{{1, 2}, {4, 10}, {12, 100}},
|
||||
beforeT: 3,
|
||||
after: Intervals{{4, 10}, {12, 100}},
|
||||
},
|
||||
{
|
||||
before: Intervals{{1, 2}, {4, 10}, {12, 100}, {200, 1000}},
|
||||
beforeT: 900,
|
||||
after: Intervals{{200, 1000}},
|
||||
},
|
||||
{
|
||||
before: Intervals{{1, 2}, {4, 10}, {12, 100}, {200, 1000}},
|
||||
beforeT: 2000,
|
||||
after: nil,
|
||||
},
|
||||
{
|
||||
before: Intervals{{1, 2}, {4, 10}, {12, 100}, {200, 1000}},
|
||||
beforeT: 0,
|
||||
after: Intervals{{1, 2}, {4, 10}, {12, 100}, {200, 1000}},
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
ref := uint64(42)
|
||||
stones := NewMemTombstones()
|
||||
stones.AddInterval(ref, c.before...)
|
||||
|
||||
stones.TruncateBefore(c.beforeT)
|
||||
ts, err := stones.Get(ref)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.after, ts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddingNewIntervals(t *testing.T) {
|
||||
cases := []struct {
|
||||
exist Intervals
|
||||
|
Loading…
Reference in New Issue
Block a user