Make Appends after Delete visible.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-05-22 11:28:24 +05:30
parent 009dd2cde5
commit 662d8173fe
No known key found for this signature in database
GPG Key ID: F1C217E8E9023CAD
3 changed files with 48 additions and 5 deletions

View File

@ -242,7 +242,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := pb.indexr ir := pb.indexr
// Choose only valid postings which have chunks in the time-range. // Choose only valid postings which have chunks in the time-range.
vPostings := []uint32{} delStones := map[uint32][]trange{}
Outer: Outer:
for p.Next() { for p.Next() {
@ -259,7 +259,12 @@ Outer:
for _, chk := range chunks { for _, chk := range chunks {
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
vPostings = append(vPostings, p.At()) // Delete only until the current maxtime and not beyond.
maxtime := chunks[len(chunks)-1].MaxTime
if maxtime > maxt {
maxtime = maxt
}
delStones[p.At()] = []trange{{mint, maxtime}}
continue Outer continue Outer
} }
} }
@ -271,7 +276,7 @@ Outer:
// Merge the current and new tombstones. // Merge the current and new tombstones.
tr := pb.Tombstones() tr := pb.Tombstones()
str := newSimpleTombstoneReader(vPostings, []trange{{mint, maxt}}) str := newMapTombstoneReader(delStones)
tombreader := newMergedTombstoneReader(tr, str) tombreader := newMergedTombstoneReader(tr, str)
return writeTombstoneFile(pb.dir, tombreader) return writeTombstoneFile(pb.dir, tombreader)

View File

@ -250,7 +250,12 @@ Outer:
} }
} }
h.tombstones.stones[ref] = addNewInterval(h.tombstones.stones[ref], trange{mint, maxt}) // Delete only until the current values and not beyond.
maxtime := h.series[ref].head().maxTime
if maxtime > maxt {
maxtime = maxt
}
h.tombstones.stones[ref] = addNewInterval(h.tombstones.stones[ref], trange{mint, maxtime})
} }
if p.Err() != nil { if p.Err() != nil {

View File

@ -382,7 +382,7 @@ func TestHeadBlock_e2e(t *testing.T) {
return return
} }
func TestDelete_simple(t *testing.T) { func TestDeleteSimple(t *testing.T) {
numSamples := int64(10) numSamples := int64(10)
dir, _ := ioutil.TempDir("", "test") dir, _ := ioutil.TempDir("", "test")
@ -474,6 +474,39 @@ Outer:
} }
} }
func TestDeleteUntilCurMax(t *testing.T) {
numSamples := int64(10)
dir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(dir)
hb := createTestHeadBlock(t, dir, 0, 2*numSamples)
app := hb.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
}
require.NoError(t, app.Commit())
require.NoError(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b")))
app = hb.Appender()
_, err := app.Add(labels.Labels{{"a", "b"}}, 11, 1)
require.NoError(t, err)
require.NoError(t, app.Commit())
q := hb.Querier(0, 100000)
res := q.Select(labels.NewEqualMatcher("a", "b"))
require.True(t, res.Next())
exps := res.At()
it := exps.Iterator()
ressmpls, err := expandSeriesIterator(it)
require.NoError(t, err)
require.Equal(t, []sample{{11, 1}}, ressmpls)
}
func TestDelete_e2e(t *testing.T) { func TestDelete_e2e(t *testing.T) {
numDatapoints := 1000 numDatapoints := 1000
numRanges := 1000 numRanges := 1000