// Copyright 2017 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package index import ( "context" "errors" "fmt" "hash/crc32" "os" "path/filepath" "slices" "sort" "strconv" "testing" "github.com/stretchr/testify/require" "go.uber.org/goleak" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/util/testutil" ) func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } type series struct { l labels.Labels chunks []chunks.Meta } type mockIndex struct { series map[storage.SeriesRef]series postings map[labels.Label][]storage.SeriesRef symbols map[string]struct{} } func newMockIndex() mockIndex { ix := mockIndex{ series: make(map[storage.SeriesRef]series), postings: make(map[labels.Label][]storage.SeriesRef), symbols: make(map[string]struct{}), } ix.postings[allPostingsKey] = []storage.SeriesRef{} return ix } func (m mockIndex) Symbols() (map[string]struct{}, error) { return m.symbols, nil } func (m mockIndex) AddSeries(ref storage.SeriesRef, l labels.Labels, chunks ...chunks.Meta) error { if _, ok := m.series[ref]; ok { return fmt.Errorf("series with reference %d already added", ref) } l.Range(func(lbl labels.Label) { m.symbols[lbl.Name] = struct{}{} m.symbols[lbl.Value] = struct{}{} if _, ok := m.postings[lbl]; !ok { m.postings[lbl] = []storage.SeriesRef{} } m.postings[lbl] = append(m.postings[lbl], ref) }) m.postings[allPostingsKey] = append(m.postings[allPostingsKey], ref) s := series{l: l} // Actual chunk data is not stored in the index. for _, c := range chunks { c.Chunk = nil s.chunks = append(s.chunks, c) } m.series[ref] = s return nil } func (m mockIndex) Close() error { return nil } func (m mockIndex) LabelValues(_ context.Context, name string) ([]string, error) { values := []string{} for l := range m.postings { if l.Name == name { values = append(values, l.Value) } } return values, nil } func (m mockIndex) Postings(ctx context.Context, name string, values ...string) (Postings, error) { p := []Postings{} for _, value := range values { l := labels.Label{Name: name, Value: value} p = append(p, m.SortedPostings(NewListPostings(m.postings[l]))) } return Merge(ctx, p...), nil } func (m mockIndex) SortedPostings(p Postings) Postings { ep, err := ExpandPostings(p) if err != nil { return ErrPostings(fmt.Errorf("expand postings: %w", err)) } sort.Slice(ep, func(i, j int) bool { return labels.Compare(m.series[ep[i]].l, m.series[ep[j]].l) < 0 }) return NewListPostings(ep) } func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error { s, ok := m.series[ref] if !ok { return errors.New("not found") } builder.Assign(s.l) *chks = append((*chks)[:0], s.chunks...) return nil } func TestIndexRW_Create_Open(t *testing.T) { dir := t.TempDir() fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. iw, err := NewWriter(context.Background(), fn) require.NoError(t, err) require.NoError(t, iw.Close()) ir, err := NewFileReader(fn) require.NoError(t, err) require.NoError(t, ir.Close()) // Modify magic header must cause open to fail. f, err := os.OpenFile(fn, os.O_WRONLY, 0o666) require.NoError(t, err) _, err = f.WriteAt([]byte{0, 0}, 0) require.NoError(t, err) f.Close() _, err = NewFileReader(dir) require.Error(t, err) } func TestIndexRW_Postings(t *testing.T) { ctx := context.Background() var input indexWriterSeriesSlice for i := 1; i < 5; i++ { input = append(input, &indexWriterSeries{ labels: labels.FromStrings("a", "1", "b", strconv.Itoa(i)), }) } ir, fn, _ := createFileReader(ctx, t, input) p, err := ir.Postings(ctx, "a", "1") require.NoError(t, err) var c []chunks.Meta var builder labels.ScratchBuilder for i := 0; p.Next(); i++ { err := ir.Series(p.At(), &builder, &c) require.NoError(t, err) require.Empty(t, c) testutil.RequireEqual(t, input[i].labels, builder.Labels()) } require.NoError(t, p.Err()) // The label indices are no longer used, so test them by hand here. labelValuesOffsets := map[string]uint64{} d := encoding.NewDecbufAt(ir.b, int(ir.toc.LabelIndicesTable), castagnoliTable) cnt := d.Be32() for d.Err() == nil && d.Len() > 0 && cnt > 0 { require.Equal(t, 1, d.Uvarint(), "Unexpected number of keys for label indices table") lbl := d.UvarintStr() off := d.Uvarint64() labelValuesOffsets[lbl] = off cnt-- } require.NoError(t, d.Err()) labelIndices := map[string][]string{} for lbl, off := range labelValuesOffsets { d := encoding.NewDecbufAt(ir.b, int(off), castagnoliTable) require.Equal(t, 1, d.Be32int(), "Unexpected number of label indices table names") for i := d.Be32(); i > 0 && d.Err() == nil; i-- { v, err := ir.lookupSymbol(ctx, d.Be32()) require.NoError(t, err) labelIndices[lbl] = append(labelIndices[lbl], v) } require.NoError(t, d.Err()) } require.Equal(t, map[string][]string{ "a": {"1"}, "b": {"1", "2", "3", "4"}, }, labelIndices) t.Run("ShardedPostings()", func(t *testing.T) { ir, err := NewFileReader(fn) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, ir.Close()) }) // List all postings for a given label value. This is what we expect to get // in output from all shards. p, err = ir.Postings(ctx, "a", "1") require.NoError(t, err) var expected []storage.SeriesRef for p.Next() { expected = append(expected, p.At()) } require.NoError(t, p.Err()) require.NotEmpty(t, expected) // Query the same postings for each shard. const shardCount = uint64(4) actualShards := make(map[uint64][]storage.SeriesRef) actualPostings := make([]storage.SeriesRef, 0, len(expected)) for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ { p, err = ir.Postings(ctx, "a", "1") require.NoError(t, err) p = ir.ShardedPostings(p, shardIndex, shardCount) for p.Next() { ref := p.At() actualShards[shardIndex] = append(actualShards[shardIndex], ref) actualPostings = append(actualPostings, ref) } require.NoError(t, p.Err()) } // We expect the postings merged out of shards is the exact same of the non sharded ones. require.ElementsMatch(t, expected, actualPostings) // We expect the series in each shard are the expected ones. for shardIndex, ids := range actualShards { for _, id := range ids { var lbls labels.ScratchBuilder require.NoError(t, ir.Series(id, &lbls, nil)) require.Equal(t, shardIndex, labels.StableHash(lbls.Labels())%shardCount) } } }) } func TestPostingsMany(t *testing.T) { ctx := context.Background() // Create a label in the index which has 999 values. var input indexWriterSeriesSlice for i := 1; i < 1000; i++ { v := fmt.Sprintf("%03d", i) input = append(input, &indexWriterSeries{ labels: labels.FromStrings("i", v, "foo", "bar"), }) } ir, _, symbols := createFileReader(ctx, t, input) cases := []struct { in []string }{ // Simple cases, everything is present. {in: []string{"002"}}, {in: []string{"031", "032", "033"}}, {in: []string{"032", "033"}}, {in: []string{"127", "128"}}, {in: []string{"127", "128", "129"}}, {in: []string{"127", "129"}}, {in: []string{"128", "129"}}, {in: []string{"998", "999"}}, {in: []string{"999"}}, // Before actual values. {in: []string{"000"}}, {in: []string{"000", "001"}}, {in: []string{"000", "002"}}, // After actual values. {in: []string{"999a"}}, {in: []string{"999", "999a"}}, {in: []string{"998", "999", "999a"}}, // In the middle of actual values. {in: []string{"126a", "127", "128"}}, {in: []string{"127", "127a", "128"}}, {in: []string{"127", "127a", "128", "128a", "129"}}, {in: []string{"127", "128a", "129"}}, {in: []string{"128", "128a", "129"}}, {in: []string{"128", "129", "129a"}}, {in: []string{"126a", "126b", "127", "127a", "127b", "128", "128a", "128b", "129", "129a", "129b"}}, } var builder labels.ScratchBuilder for _, c := range cases { it, err := ir.Postings(ctx, "i", c.in...) require.NoError(t, err) got := []string{} var metas []chunks.Meta for it.Next() { require.NoError(t, ir.Series(it.At(), &builder, &metas)) got = append(got, builder.Labels().Get("i")) } require.NoError(t, it.Err()) exp := []string{} for _, e := range c.in { if _, ok := symbols[e]; ok && e != "l" { exp = append(exp, e) } } require.Equal(t, exp, got, fmt.Sprintf("input: %v", c.in)) } } func TestPersistence_index_e2e(t *testing.T) { ctx := context.Background() lbls, err := labels.ReadLabels(filepath.Join("..", "testdata", "20kseries.json"), 20000) require.NoError(t, err) // Sort labels as the index writer expects series in sorted order. sort.Sort(labels.Slice(lbls)) var input indexWriterSeriesSlice ref := uint64(0) // Generate ChunkMetas for every label set. for i, lset := range lbls { var metas []chunks.Meta for j := 0; j <= (i % 20); j++ { ref++ metas = append(metas, chunks.Meta{ MinTime: int64(j * 10000), MaxTime: int64((j+1)*10000) - 1, Ref: chunks.ChunkRef(ref), Chunk: chunkenc.NewXORChunk(), }) } input = append(input, &indexWriterSeries{ labels: lset, chunks: metas, }) } ir, _, _ := createFileReader(ctx, t, input) // Population procedure as done by compaction. var ( postings = NewMemPostings() values = map[string]map[string]struct{}{} ) mi := newMockIndex() for i, s := range input { require.NoError(t, mi.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...)) s.labels.Range(func(l labels.Label) { valset, ok := values[l.Name] if !ok { valset = map[string]struct{}{} values[l.Name] = valset } valset[l.Value] = struct{}{} }) postings.Add(storage.SeriesRef(i), s.labels) } for p := range mi.postings { gotp, err := ir.Postings(ctx, p.Name, p.Value) require.NoError(t, err) expp, err := mi.Postings(ctx, p.Name, p.Value) require.NoError(t, err) var chks, expchks []chunks.Meta var builder, eBuilder labels.ScratchBuilder for gotp.Next() { require.True(t, expp.Next()) ref := gotp.At() err := ir.Series(ref, &builder, &chks) require.NoError(t, err) err = mi.Series(expp.At(), &eBuilder, &expchks) require.NoError(t, err) testutil.RequireEqual(t, eBuilder.Labels(), builder.Labels()) require.Equal(t, expchks, chks) } require.False(t, expp.Next(), "Expected no more postings for %q=%q", p.Name, p.Value) require.NoError(t, gotp.Err()) } labelPairs := map[string][]string{} for l := range mi.postings { labelPairs[l.Name] = append(labelPairs[l.Name], l.Value) } for k, v := range labelPairs { sort.Strings(v) res, err := ir.SortedLabelValues(ctx, k) require.NoError(t, err) require.Equal(t, len(v), len(res)) for i := 0; i < len(v); i++ { require.Equal(t, v[i], res[i]) } } gotSymbols := []string{} it := ir.Symbols() for it.Next() { gotSymbols = append(gotSymbols, it.At()) } require.NoError(t, it.Err()) expSymbols := []string{} for s := range mi.symbols { expSymbols = append(expSymbols, s) } sort.Strings(expSymbols) require.Equal(t, expSymbols, gotSymbols) } func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index")) require.NoError(t, err) require.NoError(t, w.AddSymbol("__name__")) require.NoError(t, w.AddSymbol("metric_1")) require.NoError(t, w.AddSymbol("metric_2")) require.NoError(t, w.AddSeries(0, labels.FromStrings("__name__", "metric_1", "__name__", "metric_2"))) err = w.Close() require.Error(t, err) require.ErrorContains(t, err, "corruption detected when writing postings to index") } func TestDecbufUvarintWithInvalidBuffer(t *testing.T) { b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) db := encoding.NewDecbufUvarintAt(b, 0, castagnoliTable) require.Error(t, db.Err()) } func TestReaderWithInvalidBuffer(t *testing.T) { b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) _, err := NewReader(b) require.Error(t, err) } // TestNewFileReaderErrorNoOpenFiles ensures that in case of an error no file remains open. func TestNewFileReaderErrorNoOpenFiles(t *testing.T) { dir := testutil.NewTemporaryDirectory("block", t) idxName := filepath.Join(dir.Path(), "index") err := os.WriteFile(idxName, []byte("corrupted contents"), 0o666) require.NoError(t, err) _, err = NewFileReader(idxName) require.Error(t, err) // dir.Close will fail on Win if idxName fd is not closed on error path. dir.Close() } func TestSymbols(t *testing.T) { buf := encoding.Encbuf{} // Add prefix to the buffer to simulate symbols as part of larger buffer. buf.PutUvarintStr("something") symbolsStart := buf.Len() buf.PutBE32int(204) // Length of symbols table. buf.PutBE32int(100) // Number of symbols. for i := 0; i < 100; i++ { // i represents index in unicode characters table. buf.PutUvarintStr(string(rune(i))) // Symbol. } checksum := crc32.Checksum(buf.Get()[symbolsStart+4:], castagnoliTable) buf.PutBE32(checksum) // Check sum at the end. s, err := NewSymbols(realByteSlice(buf.Get()), FormatV2, symbolsStart) require.NoError(t, err) // We store only 4 offsets to symbols. require.Equal(t, 32, s.Size()) for i := 99; i >= 0; i-- { s, err := s.Lookup(uint32(i)) require.NoError(t, err) require.Equal(t, string(rune(i)), s) } _, err = s.Lookup(100) require.Error(t, err) for i := 99; i >= 0; i-- { r, err := s.ReverseLookup(string(rune(i))) require.NoError(t, err) require.Equal(t, uint32(i), r) } _, err = s.ReverseLookup(string(rune(100))) require.Error(t, err) iter := s.Iter() i := 0 for iter.Next() { require.Equal(t, string(rune(i)), iter.At()) i++ } require.NoError(t, iter.Err()) } func BenchmarkReader_ShardedPostings(b *testing.B) { const ( numSeries = 10000 numShards = 16 ) ctx := context.Background() var input indexWriterSeriesSlice for i := 1; i <= numSeries; i++ { input = append(input, &indexWriterSeries{ labels: labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i)), }) } ir, _, _ := createFileReader(ctx, b, input) b.ResetTimer() for n := 0; n < b.N; n++ { allPostings, err := ir.Postings(ctx, "const", fmt.Sprintf("%10d", 1)) require.NoError(b, err) ir.ShardedPostings(allPostings, uint64(n%numShards), numShards) } } func TestDecoder_Postings_WrongInput(t *testing.T) { _, _, err := (&Decoder{}).Postings([]byte("the cake is a lie")) require.Error(t, err) } func TestChunksRefOrdering(t *testing.T) { dir := t.TempDir() idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) require.NoError(t, idx.AddSymbol("2")) require.NoError(t, idx.AddSymbol("__name__")) c50 := chunks.Meta{Ref: 50} c100 := chunks.Meta{Ref: 100} c200 := chunks.Meta{Ref: 200} require.NoError(t, idx.AddSeries(1, labels.FromStrings("__name__", "1"), c100)) require.EqualError(t, idx.AddSeries(2, labels.FromStrings("__name__", "2"), c50), "unsorted chunk reference: 50, previous: 100") require.NoError(t, idx.AddSeries(2, labels.FromStrings("__name__", "2"), c200)) require.NoError(t, idx.Close()) } func TestChunksTimeOrdering(t *testing.T) { dir := t.TempDir() idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) require.NoError(t, err) require.NoError(t, idx.AddSymbol("1")) require.NoError(t, idx.AddSymbol("2")) require.NoError(t, idx.AddSymbol("__name__")) require.NoError(t, idx.AddSeries(1, labels.FromStrings("__name__", "1"), chunks.Meta{Ref: 1, MinTime: 0, MaxTime: 10}, // Also checks that first chunk can have MinTime: 0. chunks.Meta{Ref: 2, MinTime: 11, MaxTime: 20}, chunks.Meta{Ref: 3, MinTime: 21, MaxTime: 30}, )) require.EqualError(t, idx.AddSeries(1, labels.FromStrings("__name__", "2"), chunks.Meta{Ref: 10, MinTime: 0, MaxTime: 10}, chunks.Meta{Ref: 20, MinTime: 10, MaxTime: 20}, ), "chunk minT 10 is not higher than previous chunk maxT 10") require.EqualError(t, idx.AddSeries(1, labels.FromStrings("__name__", "2"), chunks.Meta{Ref: 10, MinTime: 100, MaxTime: 30}, ), "chunk maxT 30 is less than minT 100") require.NoError(t, idx.Close()) } func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { const seriesCount = 1000 var input indexWriterSeriesSlice for i := 1; i < seriesCount; i++ { input = append(input, &indexWriterSeries{ labels: labels.FromStrings("__name__", fmt.Sprintf("%4d", i)), chunks: []chunks.Meta{ {Ref: 1, MinTime: 0, MaxTime: 10}, }, }) } ir, _, _ := createFileReader(context.Background(), t, input) failAfter := uint64(seriesCount / 2) // Fail after processing half of the series. ctx := &testutil.MockContextErrAfter{FailAfter: failAfter} p := ir.PostingsForLabelMatching(ctx, "__name__", func(string) bool { return true }) require.Error(t, p.Err()) require.Equal(t, failAfter, ctx.Count()) } // createFileReader creates a temporary index file. It writes the provided input to this file. // It returns a Reader for this file, the file's name, and the symbol map. func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSeriesSlice) (*Reader, string, map[string]struct{}) { tb.Helper() fn := filepath.Join(tb.TempDir(), indexFilename) iw, err := NewWriter(ctx, fn) require.NoError(tb, err) symbols := map[string]struct{}{} for _, s := range input { s.labels.Range(func(l labels.Label) { symbols[l.Name] = struct{}{} symbols[l.Value] = struct{}{} }) } syms := []string{} for s := range symbols { syms = append(syms, s) } slices.Sort(syms) for _, s := range syms { require.NoError(tb, iw.AddSymbol(s)) } for i, s := range input { require.NoError(tb, iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...)) } require.NoError(tb, iw.Close()) ir, err := NewFileReader(fn) require.NoError(tb, err) tb.Cleanup(func() { require.NoError(tb, ir.Close()) }) return ir, fn, symbols }