// Copyright 2017 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tsdb import ( "io/ioutil" "math" "math/rand" "os" "path/filepath" "sort" "testing" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/wal" ) func BenchmarkCreateSeries(b *testing.B) { lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), b.N) testutil.Ok(b, err) h, err := NewHead(nil, nil, nil, 10000) testutil.Ok(b, err) defer h.Close() b.ReportAllocs() b.ResetTimer() for _, l := range lbls { h.getOrCreate(l.Hash(), l) } } func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { var enc RecordEncoder for _, r := range recs { switch v := r.(type) { case []RefSeries: testutil.Ok(t, w.Log(enc.Series(v, nil))) case []RefSample: testutil.Ok(t, w.Log(enc.Samples(v, nil))) case []Stone: testutil.Ok(t, w.Log(enc.Tombstones(v, nil))) } } } func readTestWAL(t testing.TB, dir string) (recs []interface{}) { sr, err := wal.NewSegmentsReader(dir) testutil.Ok(t, err) defer sr.Close() var dec RecordDecoder r := wal.NewReader(sr) for r.Next() { rec := r.Record() switch dec.Type(rec) { case RecordSeries: series, err := dec.Series(rec, nil) testutil.Ok(t, err) recs = append(recs, series) case RecordSamples: samples, err := dec.Samples(rec, nil) testutil.Ok(t, err) recs = append(recs, samples) case RecordTombstones: tstones, err := dec.Tombstones(rec, nil) testutil.Ok(t, err) recs = append(recs, tstones) default: t.Fatalf("unknown record type") } } testutil.Ok(t, r.Err()) return recs } func TestHead_ReadWAL(t *testing.T) { entries := []interface{}{ []RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, {Ref: 11, Labels: labels.FromStrings("a", "2")}, {Ref: 100, Labels: labels.FromStrings("a", "3")}, }, []RefSample{ {Ref: 0, T: 99, V: 1}, {Ref: 10, T: 100, V: 2}, {Ref: 100, T: 100, V: 3}, }, []RefSeries{ {Ref: 50, Labels: labels.FromStrings("a", "4")}, }, []RefSample{ {Ref: 10, T: 101, V: 5}, {Ref: 50, T: 101, V: 6}, }, } dir, err := ioutil.TempDir("", "test_read_wal") testutil.Ok(t, err) defer os.RemoveAll(dir) w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) defer w.Close() populateTestWAL(t, w, entries) head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) testutil.Ok(t, head.Init(math.MinInt64)) testutil.Equals(t, uint64(100), head.lastSeriesID) s10 := head.series.getByID(10) s11 := head.series.getByID(11) s50 := head.series.getByID(50) s100 := head.series.getByID(100) testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset) testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init(). testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset) testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset) expandChunk := func(c chunkenc.Iterator) (x []sample) { for c.Next() { t, v := c.At() x = append(x, sample{t: t, v: v}) } testutil.Ok(t, c.Err()) return x } testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0))) } func TestHead_Truncate(t *testing.T) { h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() h.initTime(0) s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) s2, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1")) s3, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2")) s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) s1.chunks = []*memChunk{ {minTime: 0, maxTime: 999}, {minTime: 1000, maxTime: 1999}, {minTime: 2000, maxTime: 2999}, } s2.chunks = []*memChunk{ {minTime: 1000, maxTime: 1999}, {minTime: 2000, maxTime: 2999}, {minTime: 3000, maxTime: 3999}, } s3.chunks = []*memChunk{ {minTime: 0, maxTime: 999}, {minTime: 1000, maxTime: 1999}, } s4.chunks = []*memChunk{} // Truncation need not be aligned. testutil.Ok(t, h.Truncate(1)) testutil.Ok(t, h.Truncate(2000)) testutil.Equals(t, []*memChunk{ {minTime: 2000, maxTime: 2999}, }, h.series.getByID(s1.ref).chunks) testutil.Equals(t, []*memChunk{ {minTime: 2000, maxTime: 2999}, {minTime: 3000, maxTime: 3999}, }, h.series.getByID(s2.ref).chunks) testutil.Assert(t, h.series.getByID(s3.ref) == nil, "") testutil.Assert(t, h.series.getByID(s4.ref) == nil, "") postingsA1, _ := index.ExpandPostings(h.postings.Get("a", "1")) postingsA2, _ := index.ExpandPostings(h.postings.Get("a", "2")) postingsB1, _ := index.ExpandPostings(h.postings.Get("b", "1")) postingsB2, _ := index.ExpandPostings(h.postings.Get("b", "2")) postingsC1, _ := index.ExpandPostings(h.postings.Get("c", "1")) postingsAll, _ := index.ExpandPostings(h.postings.Get("", "")) testutil.Equals(t, []uint64{s1.ref}, postingsA1) testutil.Equals(t, []uint64{s2.ref}, postingsA2) testutil.Equals(t, []uint64{s1.ref, s2.ref}, postingsB1) testutil.Equals(t, []uint64{s1.ref, s2.ref}, postingsAll) testutil.Assert(t, postingsB2 == nil, "") testutil.Assert(t, postingsC1 == nil, "") testutil.Equals(t, map[string]struct{}{ "": {}, // from 'all' postings list "a": {}, "b": {}, "1": {}, "2": {}, }, h.symbols) testutil.Equals(t, map[string]stringset{ "a": {"1": struct{}{}, "2": struct{}{}}, "b": {"1": struct{}{}}, "": {"": struct{}{}}, }, h.values) } // Validate various behaviors brought on by firstChunkID accounting for // garbage collected chunks. func TestMemSeries_truncateChunks(t *testing.T) { s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000) for i := 0; i < 4000; i += 5 { ok, _ := s.append(int64(i), float64(i)) testutil.Assert(t, ok == true, "sample append failed") } // Check that truncate removes half of the chunks and afterwards // that the ID of the last chunk still gives us the same chunk afterwards. countBefore := len(s.chunks) lastID := s.chunkID(countBefore - 1) lastChunk := s.chunk(lastID) testutil.Assert(t, s.chunk(0) != nil, "") testutil.Assert(t, lastChunk != nil, "") s.truncateChunksBefore(2000) testutil.Equals(t, int64(2000), s.chunks[0].minTime) testutil.Assert(t, s.chunk(0) == nil, "first chunks not gone") testutil.Equals(t, countBefore/2, len(s.chunks)) testutil.Equals(t, lastChunk, s.chunk(lastID)) // Validate that the series' sample buffer is applied correctly to the last chunk // after truncation. it1 := s.iterator(s.chunkID(len(s.chunks) - 1)) _, ok := it1.(*memSafeIterator) testutil.Assert(t, ok == true, "") it2 := s.iterator(s.chunkID(len(s.chunks) - 2)) _, ok = it2.(*memSafeIterator) testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer") } func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { entries := []interface{}{ []RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, }, []RefSample{}, []RefSeries{ {Ref: 50, Labels: labels.FromStrings("a", "2")}, }, []RefSample{ {Ref: 50, T: 80, V: 1}, {Ref: 50, T: 90, V: 1}, }, } dir, err := ioutil.TempDir("", "test_delete_series") testutil.Ok(t, err) defer os.RemoveAll(dir) w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) defer w.Close() populateTestWAL(t, w, entries) head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) testutil.Ok(t, head.Init(math.MinInt64)) testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) } func TestHeadDeleteSimple(t *testing.T) { numSamples := int64(10) head, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer head.Close() app := head.Appender() smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) } testutil.Ok(t, app.Commit()) cases := []struct { intervals Intervals remaint []int64 }{ { intervals: Intervals{{0, 3}}, remaint: []int64{4, 5, 6, 7, 8, 9}, }, { intervals: Intervals{{1, 3}}, remaint: []int64{0, 4, 5, 6, 7, 8, 9}, }, { intervals: Intervals{{1, 3}, {4, 7}}, remaint: []int64{0, 8, 9}, }, { intervals: Intervals{{1, 3}, {4, 700}}, remaint: []int64{0}, }, { intervals: Intervals{{0, 9}}, remaint: []int64{}, }, } Outer: for _, c := range cases { // Reset the tombstones. head.tombstones = newMemTombstones() // Delete the ranges. for _, r := range c.intervals { testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) } // Compare the result. q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) testutil.Ok(t, err) res, err := q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) expSamples := make([]Sample, 0, len(c.remaint)) for _, ts := range c.remaint { expSamples = append(expSamples, sample{ts, smpls[ts]}) } expss := newMockSeriesSet([]Series{ newSeries(map[string]string{"a": "b"}, expSamples), }) if len(expSamples) == 0 { testutil.Assert(t, res.Next() == false, "") continue } for { eok, rok := expss.Next(), res.Next() testutil.Equals(t, eok, rok) if !eok { continue Outer } sexp := expss.At() sres := res.At() testutil.Equals(t, sexp.Labels(), sres.Labels()) smplExp, errExp := expandSeriesIterator(sexp.Iterator()) smplRes, errRes := expandSeriesIterator(sres.Iterator()) testutil.Equals(t, errExp, errRes) testutil.Equals(t, smplExp, smplRes) } } } func TestDeleteUntilCurMax(t *testing.T) { numSamples := int64(10) hb, err := NewHead(nil, nil, nil, 1000000) testutil.Ok(t, err) defer hb.Close() app := hb.Appender() smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() _, err := app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) testutil.Ok(t, err) } testutil.Ok(t, app.Commit()) testutil.Ok(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) // Test the series have been deleted. q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) res, err := q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) testutil.Assert(t, !res.Next(), "series didn't get deleted") // Add again and test for presence. app = hb.Appender() _, err = app.Add(labels.Labels{{"a", "b"}}, 11, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) q, err = NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) res, err = q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) testutil.Assert(t, res.Next(), "series don't exist") exps := res.At() it := exps.Iterator() ressmpls, err := expandSeriesIterator(it) testutil.Ok(t, err) testutil.Equals(t, []sample{{11, 1}}, ressmpls) } func TestDelete_e2e(t *testing.T) { numDatapoints := 1000 numRanges := 1000 timeInterval := int64(2) // Create 8 series with 1000 data-points of different ranges, delete and run queries. lbls := [][]labels.Label{ { {"a", "b"}, {"instance", "localhost:9090"}, {"job", "prometheus"}, }, { {"a", "b"}, {"instance", "127.0.0.1:9090"}, {"job", "prometheus"}, }, { {"a", "b"}, {"instance", "127.0.0.1:9090"}, {"job", "prom-k8s"}, }, { {"a", "b"}, {"instance", "localhost:9090"}, {"job", "prom-k8s"}, }, { {"a", "c"}, {"instance", "localhost:9090"}, {"job", "prometheus"}, }, { {"a", "c"}, {"instance", "127.0.0.1:9090"}, {"job", "prometheus"}, }, { {"a", "c"}, {"instance", "127.0.0.1:9090"}, {"job", "prom-k8s"}, }, { {"a", "c"}, {"instance", "localhost:9090"}, {"job", "prom-k8s"}, }, } seriesMap := map[string][]Sample{} for _, l := range lbls { seriesMap[labels.New(l...).String()] = []Sample{} } dir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(dir) hb, err := NewHead(nil, nil, nil, 100000) testutil.Ok(t, err) defer hb.Close() app := hb.Appender() for _, l := range lbls { ls := labels.New(l...) series := []Sample{} ts := rand.Int63n(300) for i := 0; i < numDatapoints; i++ { v := rand.Float64() _, err := app.Add(ls, ts, v) testutil.Ok(t, err) series = append(series, sample{ts, v}) ts += rand.Int63n(timeInterval) + 1 } seriesMap[labels.New(l...).String()] = series } testutil.Ok(t, app.Commit()) // Delete a time-range from each-selector. dels := []struct { ms []labels.Matcher drange Intervals }{ { ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, drange: Intervals{{300, 500}, {600, 670}}, }, { ms: []labels.Matcher{ labels.NewEqualMatcher("a", "b"), labels.NewEqualMatcher("job", "prom-k8s"), }, drange: Intervals{{300, 500}, {100, 670}}, }, { ms: []labels.Matcher{ labels.NewEqualMatcher("a", "c"), labels.NewEqualMatcher("instance", "localhost:9090"), labels.NewEqualMatcher("job", "prometheus"), }, drange: Intervals{{300, 400}, {100, 6700}}, }, // TODO: Add Regexp Matchers. } for _, del := range dels { // Reset the deletes everytime. hb.tombstones = newMemTombstones() for _, r := range del.drange { testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) } matched := labels.Slice{} for _, ls := range lbls { s := labels.Selector(del.ms) if s.Matches(ls) { matched = append(matched, ls) } } sort.Sort(matched) for i := 0; i < numRanges; i++ { q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) defer q.Close() ss, err := q.Select(del.ms...) testutil.Ok(t, err) // Build the mockSeriesSet. matchedSeries := make([]Series, 0, len(matched)) for _, m := range matched { smpls := seriesMap[m.String()] smpls = deletedSamples(smpls, del.drange) // Only append those series for which samples exist as mockSeriesSet // doesn't skip series with no samples. // TODO: But sometimes SeriesSet returns an empty SeriesIterator if len(smpls) > 0 { matchedSeries = append(matchedSeries, newSeries( m.Map(), smpls, )) } } expSs := newMockSeriesSet(matchedSeries) // Compare both SeriesSets. for { eok, rok := expSs.Next(), ss.Next() // Skip a series if iterator is empty. if rok { for !ss.At().Iterator().Next() { rok = ss.Next() if !rok { break } } } testutil.Equals(t, eok, rok) if !eok { break } sexp := expSs.At() sres := ss.At() testutil.Equals(t, sexp.Labels(), sres.Labels()) smplExp, errExp := expandSeriesIterator(sexp.Iterator()) smplRes, errRes := expandSeriesIterator(sres.Iterator()) testutil.Equals(t, errExp, errRes) testutil.Equals(t, smplExp, smplRes) } } } return } func boundedSamples(full []sample, mint, maxt int64) []sample { for len(full) > 0 { if full[0].t >= mint { break } full = full[1:] } for i, s := range full { // labels.Labelinate on the first sample larger than maxt. if s.t > maxt { return full[:i] } } // maxt is after highest sample. return full } func deletedSamples(full []Sample, dranges Intervals) []Sample { ds := make([]Sample, 0, len(full)) Outer: for _, s := range full { for _, r := range dranges { if r.inBounds(s.T()) { continue Outer } } ds = append(ds, s) } return ds } func TestComputeChunkEndTime(t *testing.T) { cases := []struct { start, cur, max int64 res int64 }{ { start: 0, cur: 250, max: 1000, res: 1000, }, { start: 100, cur: 200, max: 1000, res: 550, }, // Case where we fit floored 0 chunks. Must catch division by 0 // and default to maximum time. { start: 0, cur: 500, max: 1000, res: 1000, }, // Catch division by zero for cur == start. Strictly not a possible case. { start: 100, cur: 100, max: 1000, res: 104, }, } for _, c := range cases { got := computeChunkEndTime(c.start, c.cur, c.max) if got != c.res { t.Errorf("expected %d for (start: %d, cur: %d, max: %d), got %d", c.res, c.start, c.cur, c.max, got) } } } func TestMemSeries_append(t *testing.T) { s := newMemSeries(labels.Labels{}, 1, 500) // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. ok, chunkCreated := s.append(998, 1) testutil.Assert(t, ok, "append failed") testutil.Assert(t, chunkCreated, "first sample created chunk") ok, chunkCreated = s.append(999, 2) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") ok, chunkCreated = s.append(1000, 3) testutil.Assert(t, ok, "append failed") testutil.Assert(t, chunkCreated, "expected new chunk on boundary") ok, chunkCreated = s.append(1001, 4) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") testutil.Assert(t, s.chunks[0].minTime == 998 && s.chunks[0].maxTime == 999, "wrong chunk range") testutil.Assert(t, s.chunks[1].minTime == 1000 && s.chunks[1].maxTime == 1001, "wrong chunk range") // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { ok, _ := s.append(1001+int64(i), float64(i)) testutil.Assert(t, ok, "append failed") } testutil.Assert(t, len(s.chunks) > 7, "expected intermediate chunks") // All chunks but the first and last should now be moderately full. for i, c := range s.chunks[1 : len(s.chunks)-1] { testutil.Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples()) } } func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() h.initTime(0) s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s.chunks = []*memChunk{ {minTime: 0, maxTime: 999}, {minTime: 1000, maxTime: 1999}, } idx := h.indexRange(0, 1500) var ( lset labels.Labels chunks []chunks.Meta ) testutil.Ok(t, idx.Series(1, &lset, &chunks)) testutil.Equals(t, labels.Labels{{ Name: "a", Value: "1", }}, lset) testutil.Equals(t, 2, len(chunks)) cr := h.chunksRange(0, 1500) _, err = cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) h.Truncate(1500) // Remove a chunk. _, err = cr.Chunk(chunks[0].Ref) testutil.Equals(t, ErrNotFound, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) } func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() h.initTime(0) s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s.chunks = []*memChunk{ {minTime: 0, maxTime: 999}, {minTime: 1000, maxTime: 1999}, } idx := h.indexRange(0, 2000) var ( lset labels.Labels chunks []chunks.Meta ) testutil.Ok(t, idx.Series(1, &lset, &chunks)) testutil.Equals(t, labels.Labels{{ Name: "a", Value: "1", }}, lset) testutil.Equals(t, 2, len(chunks)) cr := h.chunksRange(0, 2000) _, err = cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Ok(t, err) h.Truncate(2000) // Remove the series. testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1)) _, err = cr.Chunk(chunks[0].Ref) testutil.Equals(t, ErrNotFound, err) _, err = cr.Chunk(chunks[1].Ref) testutil.Equals(t, ErrNotFound, err) } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() h.initTime(0) app := h.appender() lset := labels.FromStrings("a", "1") _, err = app.Add(lset, 2100, 1) testutil.Ok(t, err) testutil.Ok(t, h.Truncate(2000)) testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected") testutil.Ok(t, app.Commit()) q, err := NewBlockQuerier(h, 1500, 2500) testutil.Ok(t, err) defer q.Close() ss, err := q.Select(labels.NewEqualMatcher("a", "1")) testutil.Ok(t, err) testutil.Equals(t, true, ss.Next()) } func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() h.initTime(0) app := h.appender() lset := labels.FromStrings("a", "1") _, err = app.Add(lset, 2100, 1) testutil.Ok(t, err) testutil.Ok(t, h.Truncate(2000)) testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected") testutil.Ok(t, app.Rollback()) q, err := NewBlockQuerier(h, 1500, 2500) testutil.Ok(t, err) defer q.Close() ss, err := q.Select(labels.NewEqualMatcher("a", "1")) testutil.Ok(t, err) testutil.Equals(t, false, ss.Next()) // Truncate again, this time the series should be deleted testutil.Ok(t, h.Truncate(2050)) testutil.Equals(t, (*memSeries)(nil), h.series.getByHash(lset.Hash(), lset)) } func TestHead_LogRollback(t *testing.T) { dir, err := ioutil.TempDir("", "wal_rollback") testutil.Ok(t, err) defer os.RemoveAll(dir) w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) defer w.Close() h, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) app := h.Appender() _, err = app.Add(labels.FromStrings("a", "b"), 1, 2) testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) recs := readTestWAL(t, w.Dir()) testutil.Equals(t, 1, len(recs)) series, ok := recs[0].([]RefSeries) testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) } func TestWalRepair(t *testing.T) { var enc RecordEncoder for name, test := range map[string]struct { corrFunc func(rec []byte) []byte // Func that applies the corruption to a record. rec []byte totalRecs int expRecs int }{ "invalid_record": { func(rec []byte) []byte { rec[0] = byte(RecordInvalid) return rec }, enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), 9, 5, }, "decode_series": { func(rec []byte) []byte { return rec[:3] }, enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), 9, 5, }, "decode_samples": { func(rec []byte) []byte { return rec[:3] }, enc.Samples([]RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}), 9, 5, }, "decode_tombstone": { func(rec []byte) []byte { return rec[:3] }, enc.Tombstones([]Stone{{ref: 1, intervals: Intervals{}}}, []byte{}), 9, 5, }, } { t.Run(name, func(t *testing.T) { dir, err := ioutil.TempDir("", "wal_head_repair") testutil.Ok(t, err) defer os.RemoveAll(dir) w, err := wal.New(nil, nil, dir) testutil.Ok(t, err) defer w.Close() for i := 1; i <= test.totalRecs; i++ { // At this point insert a corrupted record. if i-1 == test.expRecs { testutil.Ok(t, w.Log(test.corrFunc(test.rec))) continue } testutil.Ok(t, w.Log(test.rec)) } h, err := NewHead(nil, nil, w, 1) testutil.Ok(t, err) testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) testutil.Ok(t, h.Init(math.MinInt64)) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) sr, err := wal.NewSegmentsReader(dir) testutil.Ok(t, err) defer sr.Close() r := wal.NewReader(sr) var actRec int for r.Next() { actRec++ } testutil.Ok(t, r.Err()) testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") }) } }