From 971dafdfbe41e7f0a3ec363a60ae61b56877683f Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 11 Dec 2019 21:24:03 +0000 Subject: [PATCH] Coalesce series reads where we can. When compacting rather than doing a read of all series in the index per label name, do many at once but only when it won't use (much) more ram than writing the special all index does. original in-memory postings: BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 1 1202383447 ns/op 158936496 B/op 1031511 allocs/op BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 1 1141792706 ns/op 154453408 B/op 1093453 allocs/op BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 1 1169288829 ns/op 161072336 B/op 1110021 allocs/op BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 1 1115700103 ns/op 149480472 B/op 1129180 allocs/op BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 1 1283813141 ns/op 162937800 B/op 1202771 allocs/op before: BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 1 1145195941 ns/op 131749984 B/op 834400 allocs/op BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 1 1233526345 ns/op 127889416 B/op 897033 allocs/op BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 1 1821942296 ns/op 131665648 B/op 914836 allocs/op BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 1 8035568665 ns/op 123811832 B/op 934312 allocs/op BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 1 71325926267 ns/op 140722648 B/op 1016824 allocs/op after: BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 1 1101429174 ns/op 129063496 B/op 832571 allocs/op BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 1 1074466374 ns/op 124154888 B/op 894875 allocs/op BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 1 1166510282 ns/op 128790648 B/op 912931 allocs/op BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 1 1075013071 ns/op 120570696 B/op 933511 allocs/op BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 1 1231673790 ns/op 138754288 B/op 1022791 allocs/op Signed-off-by: Brian Brazil --- tsdb/compact_test.go | 30 +++++++++++++++ tsdb/encoding/encoding.go | 3 +- tsdb/index/index.go | 81 +++++++++++++++++++++++++-------------- tsdb/index/index_test.go | 2 + tsdb/mocks_test.go | 1 - 5 files changed, 85 insertions(+), 32 deletions(-) diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 91024d282..09d75c803 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -860,6 +860,36 @@ func BenchmarkCompaction(b *testing.B) { } } +func BenchmarkCompactionFromHead(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_compaction_from_head") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() + totalSeries := 100000 + for labelNames := 1; labelNames < totalSeries; labelNames *= 10 { + labelValues := totalSeries / labelNames + b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(b, err) + for ln := 0; ln < labelNames; ln++ { + app := h.Appender() + for lv := 0; lv < labelValues; lv++ { + app.Add(labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0) + } + testutil.Ok(b, app.Commit()) + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + createBlockFromHead(b, filepath.Join(dir, fmt.Sprintf("%d-%d", i, labelNames)), h) + } + h.Close() + }) + } +} + // TestDisableAutoCompactions checks that we can // disable and enable the auto compaction. // This is needed for unit tests that rely on diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index 422df7abb..e0268d368 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -268,7 +268,7 @@ func (d *Decbuf) Byte() byte { return x } -func (d *Decbuf) EatPadding() { +func (d *Decbuf) ConsumePadding() { if d.E != nil { return } @@ -279,7 +279,6 @@ func (d *Decbuf) EatPadding() { d.E = ErrInvalidSize return } - return } func (d *Decbuf) Err() error { return d.E } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 1303f531d..d761d23bf 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -126,7 +126,7 @@ type Writer struct { reverseSymbols map[uint32]string labelIndexes []labelIndexHashEntry // label index offsets postings []postingsHashEntry // postings lists offsets - labelNames map[string]struct{} // label names + labelNames map[string]uint64 // label names, and their usage // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels @@ -208,10 +208,8 @@ func NewWriter(fn string) (*Writer, error) { buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, // Caches. - symbols: make(map[string]uint32, 1<<13), - reverseSymbols: make(map[uint32]string, 1<<13), - labelNames: make(map[string]struct{}, 1<<8), - crc32: newCRC32(), + labelNames: make(map[string]uint64, 1<<8), + crc32: newCRC32(), } if err := iw.writeMeta(); err != nil { return nil, err @@ -337,7 +335,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta if !ok { return errors.Errorf("symbol entry for %q does not exist", l.Name) } - w.labelNames[l.Name] = struct{}{} + w.labelNames[l.Name]++ w.buf2.PutUvarint32(index) index, ok = w.symbols[l.Value] @@ -409,6 +407,7 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error { } w.symbols = make(map[string]uint32, len(symbols)) + w.reverseSymbols = make(map[uint32]string, len(symbols)) for index, s := range symbols { w.symbols[s] = uint32(index) @@ -596,7 +595,6 @@ func (w *Writer) writeTOC() error { } func (w *Writer) writePostings() error { - names := make([]string, 0, len(w.labelNames)) for n := range w.labelNames { names = append(names, n) @@ -617,7 +615,7 @@ func (w *Writer) writePostings() error { d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) d.B = d.B[w.toc.Series:] // dec.Skip not merged yet for d.Len() > 0 { - d.EatPadding() + d.ConsumePadding() startPos := w.toc.LabelIndices - uint64(d.Len()) if startPos%16 != 0 { return errors.Errorf("series not 16-byte aligned at %d", startPos) @@ -630,32 +628,53 @@ func (w *Writer) writePostings() error { return nil } } - w.writePosting("", "", offsets) + if err := w.writePosting("", "", offsets); err != nil { + return err + } + maxPostings := uint64(len(offsets)) // No label name can have more postings than this. - for _, name := range names { - nameo := w.symbols[name] - postings := map[uint32][]uint32{} + for len(names) > 0 { + batchNames := []string{} + var c uint64 + // Try to bunch up label names into one loop, but avoid + // using more memory than a single label name can. + for len(names) > 0 { + if w.labelNames[names[0]]+c > maxPostings { + break + } + batchNames = append(batchNames, names[0]) + names = names[1:] + } + + nameSymbols := map[uint32]struct{}{} + for _, name := range batchNames { + nameSymbols[w.symbols[name]] = struct{}{} + } + // Label name -> label value -> positions. + postings := map[uint32]map[uint32][]uint32{} d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) - d.B = d.B[w.toc.Series:] // dec.Skip not merged yet + d.Skip(int(w.toc.Series)) for d.Len() > 0 { - d.EatPadding() + d.ConsumePadding() startPos := w.toc.LabelIndices - uint64(d.Len()) l := d.Uvarint() // Length of this series in bytes. startLen := d.Len() - // See if this label name is in the series. + // See if label names we want are in the series. numLabels := d.Uvarint() for i := 0; i < numLabels; i++ { lno := uint32(d.Uvarint()) lvo := uint32(d.Uvarint()) - if lno == nameo { - if _, ok := postings[lvo]; !ok { - postings[lvo] = []uint32{} + if _, ok := nameSymbols[lno]; ok { + if _, ok := postings[lno]; !ok { + postings[lno] = map[uint32][]uint32{} } - postings[lvo] = append(postings[lvo], uint32(startPos/16)) - break + if _, ok := postings[lno][lvo]; !ok { + postings[lno][lvo] = []uint32{} + } + postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16)) } } // Skip to next series. The 4 is for the CRC32. @@ -666,16 +685,20 @@ func (w *Writer) writePostings() error { } } - // Write out postings for this label name. - values := make([]uint32, 0, len(postings)) - for v := range postings { - values = append(values, v) + for _, name := range batchNames { + // Write out postings for this label name. + values := make([]uint32, 0, len(postings[w.symbols[name]])) + for v := range postings[w.symbols[name]] { + values = append(values, v) - } - // Symbol numbers are in order, so the strings will also be in order. - sort.Sort(uint32slice(values)) - for _, v := range values { - w.writePosting(name, w.reverseSymbols[v], postings[v]) + } + // Symbol numbers are in order, so the strings will also be in order. + sort.Sort(uint32slice(values)) + for _, v := range values { + if err := w.writePosting(name, w.reverseSymbols[v], postings[w.symbols[name]][v]); err != nil { + return err + } + } } } diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 4328b26c3..7cd891abc 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -211,6 +211,7 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, iw.AddSeries(4, series[3])) err = iw.WriteLabelIndex([]string{"a"}, []string{"1"}) + testutil.Ok(t, err) err = iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"}) testutil.Ok(t, err) @@ -266,6 +267,7 @@ func TestPostingsMany(t *testing.T) { testutil.Ok(t, iw.AddSeries(uint64(i), s)) } err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"}) + testutil.Ok(t, err) testutil.Ok(t, iw.Close()) ir, err := NewFileReader(fn) diff --git a/tsdb/mocks_test.go b/tsdb/mocks_test.go index 241802ec2..c7bb422ed 100644 --- a/tsdb/mocks_test.go +++ b/tsdb/mocks_test.go @@ -62,7 +62,6 @@ func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunk } func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil } -func (mockIndexWriter) WritePostings() error { return nil } func (mockIndexWriter) Close() error { return nil } type mockBReader struct {