From 662d8173fedc1b5bb8c2701a22f0b08a38b395e0 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Mon, 22 May 2017 11:28:24 +0530 Subject: [PATCH] Make Appends after Delete visible. Signed-off-by: Goutham Veeramachaneni --- block.go | 11 ++++++++--- head.go | 7 ++++++- head_test.go | 35 ++++++++++++++++++++++++++++++++++- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/block.go b/block.go index 03a2e569a..296b119eb 100644 --- a/block.go +++ b/block.go @@ -242,7 +242,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - vPostings := []uint32{} + delStones := map[uint32][]trange{} Outer: for p.Next() { @@ -259,7 +259,12 @@ Outer: for _, chk := range chunks { if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { - vPostings = append(vPostings, p.At()) + // Delete only until the current maxtime and not beyond. + maxtime := chunks[len(chunks)-1].MaxTime + if maxtime > maxt { + maxtime = maxt + } + delStones[p.At()] = []trange{{mint, maxtime}} continue Outer } } @@ -271,7 +276,7 @@ Outer: // Merge the current and new tombstones. tr := pb.Tombstones() - str := newSimpleTombstoneReader(vPostings, []trange{{mint, maxt}}) + str := newMapTombstoneReader(delStones) tombreader := newMergedTombstoneReader(tr, str) return writeTombstoneFile(pb.dir, tombreader) diff --git a/head.go b/head.go index 84434753d..c98a4deff 100644 --- a/head.go +++ b/head.go @@ -250,7 +250,12 @@ Outer: } } - h.tombstones.stones[ref] = addNewInterval(h.tombstones.stones[ref], trange{mint, maxt}) + // Delete only until the current values and not beyond. + maxtime := h.series[ref].head().maxTime + if maxtime > maxt { + maxtime = maxt + } + h.tombstones.stones[ref] = addNewInterval(h.tombstones.stones[ref], trange{mint, maxtime}) } if p.Err() != nil { diff --git a/head_test.go b/head_test.go index 36f6a49df..ef841cdb0 100644 --- a/head_test.go +++ b/head_test.go @@ -382,7 +382,7 @@ func TestHeadBlock_e2e(t *testing.T) { return } -func TestDelete_simple(t *testing.T) { +func TestDeleteSimple(t *testing.T) { numSamples := int64(10) dir, _ := ioutil.TempDir("", "test") @@ -474,6 +474,39 @@ Outer: } } +func TestDeleteUntilCurMax(t *testing.T) { + numSamples := int64(10) + + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) + + hb := createTestHeadBlock(t, dir, 0, 2*numSamples) + app := hb.Appender() + + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + } + + require.NoError(t, app.Commit()) + require.NoError(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) + app = hb.Appender() + _, err := app.Add(labels.Labels{{"a", "b"}}, 11, 1) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + q := hb.Querier(0, 100000) + res := q.Select(labels.NewEqualMatcher("a", "b")) + + require.True(t, res.Next()) + exps := res.At() + it := exps.Iterator() + ressmpls, err := expandSeriesIterator(it) + require.NoError(t, err) + require.Equal(t, []sample{{11, 1}}, ressmpls) +} + func TestDelete_e2e(t *testing.T) { numDatapoints := 1000 numRanges := 1000