diff --git a/db_test.go b/db_test.go index 92d487eec..c63eaf780 100644 --- a/db_test.go +++ b/db_test.go @@ -209,31 +209,48 @@ func TestDBAppenderAddRef(t *testing.T) { func TestDeleteSimple(t *testing.T) { numSamples := int64(10) - db, close := openTestDB(t, nil) - defer close() - defer db.Close() - - app := db.Appender() - - smpls := make([]float64, numSamples) - for i := int64(0); i < numSamples; i++ { - smpls[i] = rand.Float64() - app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) - } - - testutil.Ok(t, app.Commit()) cases := []struct { intervals Intervals remaint []int64 }{ + { + intervals: Intervals{{0, 3}}, + remaint: []int64{4, 5, 6, 7, 8, 9}, + }, + { + intervals: Intervals{{1, 3}}, + remaint: []int64{0, 4, 5, 6, 7, 8, 9}, + }, { intervals: Intervals{{1, 3}, {4, 7}}, remaint: []int64{0, 8, 9}, }, + { + intervals: Intervals{{1, 3}, {4, 700}}, + remaint: []int64{0}, + }, + { // This case is to ensure that labels and symbols are deleted. + intervals: Intervals{{0, 9}}, + remaint: []int64{}, + }, } Outer: for _, c := range cases { + db, close := openTestDB(t, nil) + defer close() + defer db.Close() + + app := db.Appender() + + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + } + + testutil.Ok(t, app.Commit()) + // TODO(gouthamve): Reset the tombstones somehow. // Delete the ranges. for _, r := range c.intervals { @@ -256,9 +273,20 @@ Outer: newSeries(map[string]string{"a": "b"}, expSamples), }) + lns, err := q.LabelNames() + testutil.Ok(t, err) + lvs, err := q.LabelValues("a") + testutil.Ok(t, err) if len(expSamples) == 0 { + testutil.Equals(t, 0, len(lns)) + testutil.Equals(t, 0, len(lvs)) testutil.Assert(t, res.Next() == false, "") continue + } else { + testutil.Equals(t, 1, len(lns)) + testutil.Equals(t, 1, len(lvs)) + testutil.Equals(t, "a", lns[0]) + testutil.Equals(t, "b", lvs[0]) } for { diff --git a/head.go b/head.go index cbc8661f8..637e8477d 100644 --- a/head.go +++ b/head.go @@ -48,6 +48,10 @@ var ( // ErrOutOfBounds is returned if an appended sample is out of the // writable time range. ErrOutOfBounds = errors.New("out of bounds") + + // emptyTombstoneReader is a no-op Tombstone Reader. + // This is used by head to satisfy the Tombstones() function call. + emptyTombstoneReader = newMemTombstones() ) // Head handles reads and writes of time series data within a time window. @@ -71,8 +75,6 @@ type Head struct { values map[string]stringset // label names to possible values postings *index.MemPostings // postings lists for terms - - tombstones *memTombstones } type headMetrics struct { @@ -231,7 +233,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), - tombstones: newMemTombstones(), } h.metrics = newHeadMetrics(h, r) @@ -334,12 +335,14 @@ func (h *Head) loadWAL(r *wal.Reader) error { } var ( - dec RecordDecoder - series []RefSeries - samples []RefSample - tstones []Stone - err error + dec RecordDecoder + series []RefSeries + samples []RefSample + tstones []Stone + allStones = newMemTombstones() + err error ) + defer allStones.Close() for r.Next() { series, samples, tstones = series[:0], samples[:0], tstones[:0] rec := r.Record() @@ -413,7 +416,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { if itv.Maxt < h.minValidTime { continue } - h.tombstones.addInterval(s.ref, itv) + allStones.addInterval(s.ref, itv) } } default: @@ -436,6 +439,12 @@ func (h *Head) loadWAL(r *wal.Reader) error { } wg.Wait() + if err := allStones.Iter(func(ref uint64, dranges Intervals) error { + return h.chunkRewrite(ref, dranges) + }); err != nil { + return errors.Wrap(r.Err(), "deleting samples from tombstones") + } + if unknownRefs > 0 { level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) } @@ -604,7 +613,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) { } func (h *rangeHead) Tombstones() (TombstoneReader, error) { - return h.head.tombstones, nil + return emptyTombstoneReader, nil } // initAppender is a helper to initialize the time bounds of the head @@ -849,7 +858,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { } var stones []Stone - + dirty := false for p.Next() { series := h.series.getByID(p.At()) @@ -859,22 +868,61 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { } // Delete only until the current values and not beyond. t0, t1 = clampInterval(mint, maxt, t0, t1) - stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) + if h.wal != nil { + stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) + } + if err := h.chunkRewrite(p.At(), Intervals{{t0, t1}}); err != nil { + return errors.Wrap(err, "delete samples") + } + dirty = true } - if p.Err() != nil { return p.Err() } var enc RecordEncoder - if h.wal != nil { + // Although we don't store the stones in the head + // we need to write them to the WAL to mark these as deleted + // after a restart while loeading the WAL. if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { return err } } - for _, s := range stones { - h.tombstones.addInterval(s.ref, s.intervals[0]) + if dirty { + h.gc() } + + return nil +} + +// chunkRewrite re-writes the chunks which overlaps with deleted ranges +// and removes the samples in the deleted ranges. +// Chunks is deleted if no samples are left at the end. +func (h *Head) chunkRewrite(ref uint64, dranges Intervals) (err error) { + if len(dranges) == 0 { + return nil + } + + ms := h.series.getByID(ref) + ms.Lock() + defer ms.Unlock() + if len(ms.chunks) == 0 { + return nil + } + + metas := ms.chunksMetas() + mint, maxt := metas[0].MinTime, metas[len(metas)-1].MaxTime + it := newChunkSeriesIterator(metas, dranges, mint, maxt) + + ms.reset() + for it.Next() { + t, v := it.At() + ok, _ := ms.append(t, v) + if !ok { + level.Warn(h.logger).Log("msg", "failed to add sample during delete") + } + } + return nil } @@ -926,7 +974,7 @@ func (h *Head) gc() { // Tombstones returns a new reader over the head's tombstones func (h *Head) Tombstones() (TombstoneReader, error) { - return h.tombstones, nil + return emptyTombstoneReader, nil } // Index returns an IndexReader against the block. @@ -1406,6 +1454,16 @@ type memSeries struct { app chunkenc.Appender // Current appender for the chunk. } +func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { + s := &memSeries{ + lset: lset, + ref: id, + chunkRange: chunkRange, + nextAt: math.MinInt64, + } + return s +} + func (s *memSeries) minTime() int64 { if len(s.chunks) == 0 { return math.MinInt64 @@ -1442,14 +1500,24 @@ func (s *memSeries) cut(mint int64) *memChunk { return c } -func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { - s := &memSeries{ - lset: lset, - ref: id, - chunkRange: chunkRange, - nextAt: math.MinInt64, +func (s *memSeries) chunksMetas() []chunks.Meta { + metas := make([]chunks.Meta, 0, len(s.chunks)) + for _, chk := range s.chunks { + metas = append(metas, chunks.Meta{Chunk: chk.chunk, MinTime: chk.minTime, MaxTime: chk.maxTime}) } - return s + return metas +} + +// reset re-initialises all the variable in the memSeries except 'lset', 'ref', +// and 'chunkRange', like how it would appear after 'newMemSeries(...)'. +func (s *memSeries) reset() { + s.chunks = nil + s.headChunk = nil + s.firstChunkID = 0 + s.nextAt = math.MinInt64 + s.sampleBuf = [4]sample{} + s.pendingCommit = false + s.app = nil } // appendable checks whether the given sample is valid for appending to the series. diff --git a/head_test.go b/head_test.go index 8781f677a..cfc307f25 100644 --- a/head_test.go +++ b/head_test.go @@ -18,6 +18,7 @@ import ( "math" "math/rand" "os" + "path" "path/filepath" "sort" "testing" @@ -297,92 +298,165 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { func TestHeadDeleteSimple(t *testing.T) { numSamples := int64(10) - head, err := NewHead(nil, nil, nil, 1000) - testutil.Ok(t, err) - defer head.Close() - - app := head.Appender() - - smpls := make([]float64, numSamples) - for i := int64(0); i < numSamples; i++ { - smpls[i] = rand.Float64() - app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) - } - - testutil.Ok(t, app.Commit()) cases := []struct { - intervals Intervals - remaint []int64 + dranges Intervals + remaint []int64 + remainSampbuf []int64 // Sample buffer that should remain after deletion. }{ { - intervals: Intervals{{0, 3}}, - remaint: []int64{4, 5, 6, 7, 8, 9}, + dranges: Intervals{{0, 3}}, + remaint: []int64{4, 5, 6, 7, 8, 9}, + remainSampbuf: []int64{6, 7, 8, 9}, }, { - intervals: Intervals{{1, 3}}, - remaint: []int64{0, 4, 5, 6, 7, 8, 9}, + dranges: Intervals{{1, 3}}, + remaint: []int64{0, 4, 5, 6, 7, 8, 9}, + remainSampbuf: []int64{6, 7, 8, 9}, }, { - intervals: Intervals{{1, 3}, {4, 7}}, - remaint: []int64{0, 8, 9}, + dranges: Intervals{{1, 3}, {4, 7}}, + remaint: []int64{0, 8, 9}, + remainSampbuf: []int64{0, 8, 9}, }, { - intervals: Intervals{{1, 3}, {4, 700}}, - remaint: []int64{0}, + dranges: Intervals{{1, 3}, {4, 700}}, + remaint: []int64{0}, + remainSampbuf: []int64{0}, }, - { - intervals: Intervals{{0, 9}}, - remaint: []int64{}, + { // This case is to ensure that labels and symbols are deleted. + dranges: Intervals{{0, 9}}, + remaint: []int64{}, + remainSampbuf: []int64{}, }, } Outer: for _, c := range cases { - // Reset the tombstones. - head.tombstones = newMemTombstones() + dir, err := ioutil.TempDir("", "test_wal_reload") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := wal.New(nil, nil, path.Join(dir, "wal")) + testutil.Ok(t, err) + + // Samples are deleted from head after calling head.Delete() + // and not just creating tombstones. + // Hence creating new Head for every case. + head, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) + + app := head.Appender() + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + _, err = app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) // Delete the ranges. - for _, r := range c.intervals { + for _, r := range c.dranges { testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) } - // Compare the result. - q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) + reloadedW, err := wal.New(nil, nil, w.Dir()) testutil.Ok(t, err) - res, err := q.Select(labels.NewEqualMatcher("a", "b")) + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) + // Test the head reloaded from the WAL to ensure deleted samples + // are gone even after reloading the wal file. testutil.Ok(t, err) + testutil.Ok(t, reloadedHead.Init(0)) - expSamples := make([]Sample, 0, len(c.remaint)) + expSamples := make([]sample, 0, len(c.remaint)) for _, ts := range c.remaint { expSamples = append(expSamples, sample{ts, smpls[ts]}) } - expss := newMockSeriesSet([]Series{ - newSeries(map[string]string{"a": "b"}, expSamples), - }) + // Compare the samples for both heads - before and after the reload. + for _, h := range []*Head{head, reloadedHead} { + indexr, err := h.Index() + testutil.Ok(t, err) + // We use emptyTombstoneReader explicitly to get all the samples. + css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher("a", "b")) + testutil.Ok(t, err) - if len(expSamples) == 0 { - testutil.Assert(t, res.Next() == false, "") - continue + // Getting the actual samples. + actSamples := make([]sample, 0, len(c.remaint)) + if len(expSamples) > 0 { + testutil.Assert(t, css.Next() == true, "") + lbls, chkMetas, intv := css.At() + testutil.Equals(t, labels.Labels{{"a", "b"}}, lbls) + testutil.Equals(t, 0, len(intv)) + + chunkr, err := h.Chunks() + testutil.Ok(t, err) + for _, meta := range chkMetas { + chk, err := chunkr.Chunk(meta.Ref) + testutil.Ok(t, err) + ii := chk.Iterator() + for ii.Next() { + t, v := ii.At() + actSamples = append(actSamples, sample{t: t, v: v}) + } + } + } + + testutil.Assert(t, css.Next() == false, "") + testutil.Ok(t, css.Err()) + testutil.Equals(t, expSamples, actSamples) } - for { - eok, rok := expss.Next(), res.Next() - testutil.Equals(t, eok, rok) + expSamplesTemp := make([]Sample, 0, len(c.remaint)) + for _, ts := range c.remaint { + expSamplesTemp = append(expSamplesTemp, sample{ts, smpls[ts]}) + } + expSeriesSet := newMockSeriesSet([]Series{ + newSeries(map[string]string{"a": "b"}, expSamplesTemp), + }) - if !eok { - continue Outer + // Compare the query results for both heads - before and after the reload. + for _, h := range []*Head{head, reloadedHead} { + q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) + testutil.Ok(t, err) + actSeriesSet, err := q.Select(labels.NewEqualMatcher("a", "b")) + testutil.Ok(t, err) + + lns, err := q.LabelNames() + testutil.Ok(t, err) + lvs, err := q.LabelValues("a") + testutil.Ok(t, err) + if len(expSamples) == 0 { + testutil.Equals(t, 0, len(lns)) + testutil.Equals(t, 0, len(lvs)) + testutil.Assert(t, actSeriesSet.Next() == false, "") + testutil.Ok(t, h.Close()) + continue + } else { + testutil.Equals(t, 1, len(lns)) + testutil.Equals(t, 1, len(lvs)) + testutil.Equals(t, "a", lns[0]) + testutil.Equals(t, "b", lvs[0]) } - sexp := expss.At() - sres := res.At() - testutil.Equals(t, sexp.Labels(), sres.Labels()) + for { + eok, rok := expSeriesSet.Next(), actSeriesSet.Next() + testutil.Equals(t, eok, rok) - smplExp, errExp := expandSeriesIterator(sexp.Iterator()) - smplRes, errRes := expandSeriesIterator(sres.Iterator()) + if !eok { + testutil.Ok(t, h.Close()) + continue Outer + } + expSeries := expSeriesSet.At() + actSeries := actSeriesSet.At() - testutil.Equals(t, errExp, errRes) - testutil.Equals(t, smplExp, smplRes) + testutil.Equals(t, expSeries.Labels(), actSeries.Labels()) + + smplExp, errExp := expandSeriesIterator(expSeries.Iterator()) + smplRes, errRes := expandSeriesIterator(actSeries.Iterator()) + + testutil.Equals(t, errExp, errRes) + testutil.Equals(t, smplExp, smplRes) + } } } } @@ -523,8 +597,6 @@ func TestDelete_e2e(t *testing.T) { // TODO: Add Regexp Matchers. } for _, del := range dels { - // Reset the deletes everytime. - hb.tombstones = newMemTombstones() for _, r := range del.drange { testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) } @@ -945,4 +1017,5 @@ func TestWalRepair(t *testing.T) { testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") }) } + } diff --git a/testutil/testutil.go b/testutil/testutil.go index 06b9747ca..03784e7f2 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -66,6 +66,15 @@ func Equals(tb testing.TB, exp, act interface{}, msgAndArgs ...interface{}) { } } +// NotEquals fails the test if exp is equal to act. +func NotEquals(tb testing.TB, exp, act interface{}) { + if reflect.DeepEqual(exp, act) { + _, file, line, _ := runtime.Caller(1) + fmt.Printf("\033[31m%s:%d: Expected different exp and got\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act) + tb.FailNow() + } +} + func formatMessage(msgAndArgs []interface{}) string { if len(msgAndArgs) == 0 { return ""