diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 91024d282..09d75c803 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -860,6 +860,36 @@ func BenchmarkCompaction(b *testing.B) { } } +func BenchmarkCompactionFromHead(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_compaction_from_head") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() + totalSeries := 100000 + for labelNames := 1; labelNames < totalSeries; labelNames *= 10 { + labelValues := totalSeries / labelNames + b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(b, err) + for ln := 0; ln < labelNames; ln++ { + app := h.Appender() + for lv := 0; lv < labelValues; lv++ { + app.Add(labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0) + } + testutil.Ok(b, app.Commit()) + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + createBlockFromHead(b, filepath.Join(dir, fmt.Sprintf("%d-%d", i, labelNames)), h) + } + h.Close() + }) + } +} + // TestDisableAutoCompactions checks that we can // disable and enable the auto compaction. // This is needed for unit tests that rely on diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index 422df7abb..e0268d368 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -268,7 +268,7 @@ func (d *Decbuf) Byte() byte { return x } -func (d *Decbuf) EatPadding() { +func (d *Decbuf) ConsumePadding() { if d.E != nil { return } @@ -279,7 +279,6 @@ func (d *Decbuf) EatPadding() { d.E = ErrInvalidSize return } - return } func (d *Decbuf) Err() error { return d.E } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 1303f531d..d761d23bf 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -126,7 +126,7 @@ type Writer struct { reverseSymbols map[uint32]string labelIndexes []labelIndexHashEntry // label index offsets postings []postingsHashEntry // postings lists offsets - labelNames map[string]struct{} // label names + labelNames map[string]uint64 // label names, and their usage // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels @@ -208,10 +208,8 @@ func NewWriter(fn string) (*Writer, error) { buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, // Caches. - symbols: make(map[string]uint32, 1<<13), - reverseSymbols: make(map[uint32]string, 1<<13), - labelNames: make(map[string]struct{}, 1<<8), - crc32: newCRC32(), + labelNames: make(map[string]uint64, 1<<8), + crc32: newCRC32(), } if err := iw.writeMeta(); err != nil { return nil, err @@ -337,7 +335,7 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta if !ok { return errors.Errorf("symbol entry for %q does not exist", l.Name) } - w.labelNames[l.Name] = struct{}{} + w.labelNames[l.Name]++ w.buf2.PutUvarint32(index) index, ok = w.symbols[l.Value] @@ -409,6 +407,7 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error { } w.symbols = make(map[string]uint32, len(symbols)) + w.reverseSymbols = make(map[uint32]string, len(symbols)) for index, s := range symbols { w.symbols[s] = uint32(index) @@ -596,7 +595,6 @@ func (w *Writer) writeTOC() error { } func (w *Writer) writePostings() error { - names := make([]string, 0, len(w.labelNames)) for n := range w.labelNames { names = append(names, n) @@ -617,7 +615,7 @@ func (w *Writer) writePostings() error { d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) d.B = d.B[w.toc.Series:] // dec.Skip not merged yet for d.Len() > 0 { - d.EatPadding() + d.ConsumePadding() startPos := w.toc.LabelIndices - uint64(d.Len()) if startPos%16 != 0 { return errors.Errorf("series not 16-byte aligned at %d", startPos) @@ -630,32 +628,53 @@ func (w *Writer) writePostings() error { return nil } } - w.writePosting("", "", offsets) + if err := w.writePosting("", "", offsets); err != nil { + return err + } + maxPostings := uint64(len(offsets)) // No label name can have more postings than this. - for _, name := range names { - nameo := w.symbols[name] - postings := map[uint32][]uint32{} + for len(names) > 0 { + batchNames := []string{} + var c uint64 + // Try to bunch up label names into one loop, but avoid + // using more memory than a single label name can. + for len(names) > 0 { + if w.labelNames[names[0]]+c > maxPostings { + break + } + batchNames = append(batchNames, names[0]) + names = names[1:] + } + + nameSymbols := map[uint32]struct{}{} + for _, name := range batchNames { + nameSymbols[w.symbols[name]] = struct{}{} + } + // Label name -> label value -> positions. + postings := map[uint32]map[uint32][]uint32{} d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.toc.LabelIndices)) - d.B = d.B[w.toc.Series:] // dec.Skip not merged yet + d.Skip(int(w.toc.Series)) for d.Len() > 0 { - d.EatPadding() + d.ConsumePadding() startPos := w.toc.LabelIndices - uint64(d.Len()) l := d.Uvarint() // Length of this series in bytes. startLen := d.Len() - // See if this label name is in the series. + // See if label names we want are in the series. numLabels := d.Uvarint() for i := 0; i < numLabels; i++ { lno := uint32(d.Uvarint()) lvo := uint32(d.Uvarint()) - if lno == nameo { - if _, ok := postings[lvo]; !ok { - postings[lvo] = []uint32{} + if _, ok := nameSymbols[lno]; ok { + if _, ok := postings[lno]; !ok { + postings[lno] = map[uint32][]uint32{} } - postings[lvo] = append(postings[lvo], uint32(startPos/16)) - break + if _, ok := postings[lno][lvo]; !ok { + postings[lno][lvo] = []uint32{} + } + postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16)) } } // Skip to next series. The 4 is for the CRC32. @@ -666,16 +685,20 @@ func (w *Writer) writePostings() error { } } - // Write out postings for this label name. - values := make([]uint32, 0, len(postings)) - for v := range postings { - values = append(values, v) + for _, name := range batchNames { + // Write out postings for this label name. + values := make([]uint32, 0, len(postings[w.symbols[name]])) + for v := range postings[w.symbols[name]] { + values = append(values, v) - } - // Symbol numbers are in order, so the strings will also be in order. - sort.Sort(uint32slice(values)) - for _, v := range values { - w.writePosting(name, w.reverseSymbols[v], postings[v]) + } + // Symbol numbers are in order, so the strings will also be in order. + sort.Sort(uint32slice(values)) + for _, v := range values { + if err := w.writePosting(name, w.reverseSymbols[v], postings[w.symbols[name]][v]); err != nil { + return err + } + } } } diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 4328b26c3..7cd891abc 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -211,6 +211,7 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, iw.AddSeries(4, series[3])) err = iw.WriteLabelIndex([]string{"a"}, []string{"1"}) + testutil.Ok(t, err) err = iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"}) testutil.Ok(t, err) @@ -266,6 +267,7 @@ func TestPostingsMany(t *testing.T) { testutil.Ok(t, iw.AddSeries(uint64(i), s)) } err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"}) + testutil.Ok(t, err) testutil.Ok(t, iw.Close()) ir, err := NewFileReader(fn) diff --git a/tsdb/mocks_test.go b/tsdb/mocks_test.go index 241802ec2..c7bb422ed 100644 --- a/tsdb/mocks_test.go +++ b/tsdb/mocks_test.go @@ -62,7 +62,6 @@ func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunk } func (mockIndexWriter) WriteLabelIndex(names []string, values []string) error { return nil } -func (mockIndexWriter) WritePostings() error { return nil } func (mockIndexWriter) Close() error { return nil } type mockBReader struct {