From dee6981a6c78dfc28d000f86e6a6639cc37150b3 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Tue, 17 Dec 2019 22:15:35 +0000 Subject: [PATCH] Move writing of index label indices into IndexWriter. Now you only need to provide symbols and series to IndexWriter. Signed-off-by: Brian Brazil --- tsdb/block.go | 4 --- tsdb/compact.go | 27 +-------------- tsdb/index/index.go | 73 +++++++++++++++++++++------------------- tsdb/index/index_test.go | 26 -------------- 4 files changed, 39 insertions(+), 91 deletions(-) diff --git a/tsdb/block.go b/tsdb/block.go index c4976b04e..f794dbe6a 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -50,10 +50,6 @@ type IndexWriter interface { // are added later. AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error - // WriteLabelIndex serializes an index from label names to values. - // The passed in values chained tuples of strings of the length of names. - WriteLabelIndex(names []string, values []string) error - // Close writes any finalization and closes the resources associated with // the underlying writer. Close() error diff --git a/tsdb/compact.go b/tsdb/compact.go index a3dae8729..47151dad6 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -722,11 +722,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, symbols = newMergedStringIter(symbols, syms) } - var ( - values = map[string]stringset{} - ref = uint64(0) - ) - for symbols.Next() { if err := indexw.AddSymbol(symbols.At()); err != nil { return errors.Wrap(err, "add symbol") @@ -737,6 +732,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } delIter := &deletedIterator{} + ref := uint64(0) for set.Next() { select { case <-c.ctx.Done(): @@ -836,33 +832,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - for _, l := range lset { - valset, ok := values[l.Name] - if !ok { - valset = stringset{} - values[l.Name] = valset - } - valset.set(l.Value) - } - ref++ } if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set") } - s := make([]string, 0, 256) - for n, v := range values { - s = s[:0] - - for x := range v { - s = append(s, x) - } - if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { - return errors.Wrap(err, "write label index") - } - } - return nil } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index ed21417d2..1470c7776 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -70,8 +70,6 @@ const ( idxStageNone indexWriterStage = iota idxStageSymbols idxStageSeries - idxStageLabelIndex - idxStagePostings idxStageDone ) @@ -83,10 +81,6 @@ func (s indexWriterStage) String() string { return "symbols" case idxStageSeries: return "series" - case idxStageLabelIndex: - return "label index" - case idxStagePostings: - return "postings" case idxStageDone: return "done" } @@ -132,9 +126,9 @@ type Writer struct { symbolFile *fileutil.MmapFile lastSymbol string - labelIndexes []labelIndexHashEntry // label index offsets - postings []postingsHashEntry // postings lists offsets - labelNames map[string]uint64 // label names, and their usage + labelIndexes []labelIndexHashEntry // Label index offsets. + labelValues map[string]map[uint32]struct{} // Label names, and their values's symbol indexes. + labelNames map[string]uint64 // Label names, and their usage. // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels @@ -221,9 +215,9 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - // Caches. - labelNames: make(map[string]uint64, 1<<8), - crc32: newCRC32(), + labelNames: make(map[string]uint64, 1<<8), + labelValues: make(map[string]map[uint32]struct{}, 1<<8), + crc32: newCRC32(), } if err := iw.writeMeta(); err != nil { return nil, err @@ -346,10 +340,12 @@ func (w *Writer) ensureStage(s indexWriterStage) error { } w.toc.Series = w.f.pos - case idxStageLabelIndex: - w.toc.LabelIndices = w.f.pos - case idxStageDone: + w.toc.LabelIndices = w.f.pos + if err := w.writeLabelIndices(); err != nil { + return err + } + w.toc.Postings = w.f.pos if err := w.writePostings(); err != nil { return err @@ -419,6 +415,11 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err) } w.buf2.PutUvarint32(index) + + if _, ok := w.labelValues[l.Name]; !ok { + w.labelValues[l.Name] = map[uint32]struct{}{} + } + w.labelValues[l.Name][index] = struct{}{} } w.buf2.PutUvarint(len(chunks)) @@ -516,27 +517,34 @@ func (w *Writer) finishSymbols() error { return nil } -func (w *Writer) WriteLabelIndex(names []string, values []string) error { - if len(values)%len(names) != 0 { - return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) - } - if err := w.ensureStage(idxStageLabelIndex); err != nil { - return errors.Wrap(err, "ensure stage") +func (w *Writer) writeLabelIndices() error { + names := make([]string, 0, len(w.labelValues)) + for n := range w.labelValues { + names = append(names, n) } + sort.Strings(names) - valt, err := NewStringTuples(values, len(names)) - if err != nil { - return err + for _, n := range names { + values := make([]uint32, 0, len(w.labelValues[n])) + for v := range w.labelValues[n] { + values = append(values, v) + } + sort.Sort(uint32slice(values)) + if err := w.writeLabelIndex(n, values); err != nil { + return err + } } - sort.Sort(valt) + return nil +} +func (w *Writer) writeLabelIndex(name string, values []uint32) error { // Align beginning to 4 bytes for more efficient index list scans. if err := w.addPadding(4); err != nil { return err } w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{ - keys: names, + keys: []string{name}, offset: w.f.pos, }) @@ -548,21 +556,16 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { w.crc32.Reset() w.buf1.Reset() - w.buf1.PutBE32int(len(names)) - w.buf1.PutBE32int(valt.Len()) + w.buf1.PutBE32int(1) // Number of names. + w.buf1.PutBE32int(len(values)) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } - // here we have an index for the symbol file if v2, otherwise it's an offset - for _, v := range valt.entries { - sid, err := w.symbols.ReverseLookup(v) - if err != nil { - return errors.Errorf("symbol entry for %q does not exist: %v", v, err) - } + for _, v := range values { w.buf1.Reset() - w.buf1.PutBE32(sid) + w.buf1.PutBE32(v) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 07f8f8a47..a8bf59112 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -83,16 +83,6 @@ func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) return nil } -func (m mockIndex) WriteLabelIndex(names []string, values []string) error { - // TODO support composite indexes - if len(names) != 1 { - return errors.New("composite indexes not supported yet") - } - sort.Strings(values) - m.labelIndex[names[0]] = values - return nil -} - func (m mockIndex) Close() error { return nil } @@ -200,9 +190,6 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, iw.AddSeries(3, series[2])) testutil.Ok(t, iw.AddSeries(4, series[3])) - testutil.Ok(t, iw.WriteLabelIndex([]string{"a"}, []string{"1"})) - testutil.Ok(t, iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"})) - testutil.Ok(t, iw.Close()) ir, err := NewFileReader(fn) @@ -289,8 +276,6 @@ func TestPostingsMany(t *testing.T) { for i, s := range series { testutil.Ok(t, iw.AddSeries(uint64(i), s)) } - err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"}) - testutil.Ok(t, err) testutil.Ok(t, iw.Close()) ir, err := NewFileReader(fn) @@ -427,17 +412,6 @@ func TestPersistence_index_e2e(t *testing.T) { postings.Add(uint64(i), s.labels) } - for k, v := range values { - var vals []string - for e := range v { - vals = append(vals, e) - } - sort.Strings(vals) - - testutil.Ok(t, iw.WriteLabelIndex([]string{k}, vals)) - testutil.Ok(t, mi.WriteLabelIndex([]string{k}, vals)) - } - err = iw.Close() testutil.Ok(t, err)