From 85964ce567b3c62db086e89b6a38d0f37b192a65 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Tue, 17 Dec 2019 21:16:56 +0000 Subject: [PATCH 1/5] During compaction spool postings offset table on the side. This avoids building it up in memory. Signed-off-by: Brian Brazil --- tsdb/index/index.go | 112 +++++++++++++++++++++++++++++++------------- 1 file changed, 80 insertions(+), 32 deletions(-) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index fb68c71a2..d28be4f72 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -116,6 +116,12 @@ type Writer struct { fbuf *bufio.Writer pos uint64 + // Temporary file for posting offsets table. + fPO *os.File + fbufPO *bufio.Writer + posPO uint64 + cntPO uint64 + toc TOC stage indexWriterStage @@ -193,7 +199,13 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { return nil, errors.Wrap(err, "remove any existing index at path") } - f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, 0666) + // Main index file we are building. + f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + // Temporary file for posting offset table. + fPO, err := os.OpenFile(fn+"_tmp_po", os.O_CREATE|os.O_RDWR, 0666) if err != nil { return nil, err } @@ -202,11 +214,14 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } iw := &Writer{ - ctx: ctx, - f: f, - fbuf: bufio.NewWriterSize(f, 1<<22), - pos: 0, - stage: idxStageNone, + ctx: ctx, + f: f, + fbuf: bufio.NewWriterSize(f, 1<<22), + pos: 0, + fPO: fPO, + fbufPO: bufio.NewWriterSize(fPO, 1<<22), + posPO: 0, + stage: idxStageNone, // Reusable memory. buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, @@ -564,39 +579,60 @@ func (w *Writer) writeLabelIndexesOffsetTable() error { // writePostingsOffsetTable writes the postings offset table. func (w *Writer) writePostingsOffsetTable() error { - startPos := w.pos - // Leave 4 bytes of space for the length, which will be calculated later. - if err := w.write([]byte("alen")); err != nil { + // Ensure everything is in the temporary file. + if err := w.fbufPO.Flush(); err != nil { return err } - w.crc32.Reset() w.buf1.Reset() - w.buf1.PutBE32int(len(w.postings)) - w.buf1.WriteToHash(w.crc32) + w.buf1.PutBE32int(int(w.posPO) + 4) // Length, including the count. if err := w.write(w.buf1.Get()); err != nil { return err } - for _, e := range w.postings { - w.buf1.Reset() - w.buf1.PutUvarint(2) - w.buf1.PutUvarintStr(e.name) - w.buf1.PutUvarintStr(e.value) - w.buf1.PutUvarint64(e.offset) - w.buf1.WriteToHash(w.crc32) - if err := w.write(w.buf1.Get()); err != nil { - return err - } - } - - // Write out the length. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) - if err := w.writeAt(w.buf1.Get(), startPos); err != nil { + w.crc32.Reset() + w.buf1.PutBE32int(int(w.cntPO)) // Count. + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } + // Copy temporary file into main index. + if _, err := w.fPO.Seek(0, 0); err != nil { return err } + buf := make([]byte, 1<<20) + l := 0 + for { + n, err := w.fPO.Read(buf) + if err != nil && err != io.EOF { + return err + } + if n == 0 { + break + } + l += n + w.crc32.Write(buf[:n]) + if err := w.write(buf[:n]); err != nil { + return err + } + } + if w.posPO != uint64(l) { + return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.posPO, l) + } + + // Cleanup temporary file. + name := w.fPO.Name() + if err := w.fPO.Close(); err != nil { + return err + } + w.fPO = nil + if err := os.Remove(name); err != nil { + return err + } + + // Finally write the hash. w.buf1.Reset() w.buf1.PutHashSum(w.crc32) return w.write(w.buf1.Get()) @@ -749,11 +785,18 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { return err } - w.postings = append(w.postings, postingsHashEntry{ - name: name, - value: value, - offset: w.pos, - }) + // Write out postings offset table to temporary file as we go. + w.buf1.Reset() + w.buf1.PutUvarint(2) + w.buf1.PutUvarintStr(name) + w.buf1.PutUvarintStr(value) + w.buf1.PutUvarint64(w.pos) + if n, err := w.fbufPO.Write(w.buf1.Get()); err != nil { + return err + } else { + w.posPO += uint64(n) + } + w.cntPO++ w.buf1.Reset() w.buf1.PutBE32int(len(offs)) @@ -796,6 +839,11 @@ func (w *Writer) Close() error { return err } } + if w.fPO != nil { + if err := w.fPO.Close(); err != nil { + return err + } + } if err := w.fbuf.Flush(); err != nil { return err } From 1733724e304b6e201a718ed8eb04da06cfd2aeaf Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Tue, 17 Dec 2019 21:54:13 +0000 Subject: [PATCH 2/5] Factor out index file writing code. Now that we have more than one file open at a time, deduplicate a bit. Signed-off-by: Brian Brazil --- tsdb/index/index.go | 173 ++++++++++++++++++++++++++------------------ 1 file changed, 104 insertions(+), 69 deletions(-) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index d28be4f72..ed21417d2 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -111,16 +111,14 @@ func newCRC32() hash.Hash32 { // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { - ctx context.Context - f *os.File - fbuf *bufio.Writer - pos uint64 + ctx context.Context + + // For the main index file. + f *fileWriter // Temporary file for posting offsets table. - fPO *os.File - fbufPO *bufio.Writer - posPO uint64 - cntPO uint64 + fPO *fileWriter + cntPO uint64 toc TOC stage indexWriterStage @@ -200,12 +198,12 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } // Main index file we are building. - f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666) + f, err := newFileWriter(fn) if err != nil { return nil, err } // Temporary file for posting offset table. - fPO, err := os.OpenFile(fn+"_tmp_po", os.O_CREATE|os.O_RDWR, 0666) + fPO, err := newFileWriter(fn + "_tmp_po") if err != nil { return nil, err } @@ -214,14 +212,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } iw := &Writer{ - ctx: ctx, - f: f, - fbuf: bufio.NewWriterSize(f, 1<<22), - pos: 0, - fPO: fPO, - fbufPO: bufio.NewWriterSize(fPO, 1<<22), - posPO: 0, - stage: idxStageNone, + ctx: ctx, + f: f, + fPO: fPO, + stage: idxStageNone, // Reusable memory. buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, @@ -238,9 +232,41 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } func (w *Writer) write(bufs ...[]byte) error { + return w.f.write(bufs...) +} + +func (w *Writer) writeAt(buf []byte, pos uint64) error { + return w.f.writeAt(buf, pos) +} + +func (w *Writer) addPadding(size int) error { + return w.f.addPadding(size) +} + +type fileWriter struct { + f *os.File + fbuf *bufio.Writer + pos uint64 + name string +} + +func newFileWriter(name string) (*fileWriter, error) { + f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + return &fileWriter{ + f: f, + fbuf: bufio.NewWriterSize(f, 1<<22), + pos: 0, + name: name, + }, nil +} + +func (fw *fileWriter) write(bufs ...[]byte) error { for _, b := range bufs { - n, err := w.fbuf.Write(b) - w.pos += uint64(n) + n, err := fw.fbuf.Write(b) + fw.pos += uint64(n) if err != nil { return err } @@ -248,29 +274,47 @@ func (w *Writer) write(bufs ...[]byte) error { // offset references in v1 are only 4 bytes large. // Once we move to compressed/varint representations in those areas, this limitation // can be lifted. - if w.pos > 16*math.MaxUint32 { + if fw.pos > 16*math.MaxUint32 { return errors.Errorf("exceeding max size of 64GiB") } } return nil } -func (w *Writer) writeAt(buf []byte, pos uint64) error { - if err := w.fbuf.Flush(); err != nil { +func (fw *fileWriter) flush() error { + return fw.fbuf.Flush() +} + +func (fw *fileWriter) writeAt(buf []byte, pos uint64) error { + if err := fw.flush(); err != nil { return err } - _, err := w.f.WriteAt(buf, int64(pos)) + _, err := fw.f.WriteAt(buf, int64(pos)) return err } // addPadding adds zero byte padding until the file size is a multiple size. -func (w *Writer) addPadding(size int) error { - p := w.pos % uint64(size) +func (fw *fileWriter) addPadding(size int) error { + p := fw.pos % uint64(size) if p == 0 { return nil } p = uint64(size) - p - return errors.Wrap(w.write(make([]byte, p)), "add padding") + return errors.Wrap(fw.write(make([]byte, p)), "add padding") +} + +func (fw *fileWriter) close() error { + if err := fw.flush(); err != nil { + return err + } + if err := fw.f.Sync(); err != nil { + return err + } + return fw.f.Close() +} + +func (fw *fileWriter) remove() error { + return os.Remove(fw.name) } // ensureStage handles transitions between write stages and ensures that IndexWriter @@ -292,7 +336,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { // Mark start of sections in table of contents. switch s { case idxStageSymbols: - w.toc.Symbols = w.pos + w.toc.Symbols = w.f.pos if err := w.startSymbols(); err != nil { return err } @@ -300,22 +344,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error { if err := w.finishSymbols(); err != nil { return err } - w.toc.Series = w.pos + w.toc.Series = w.f.pos case idxStageLabelIndex: - w.toc.LabelIndices = w.pos + w.toc.LabelIndices = w.f.pos case idxStageDone: - w.toc.Postings = w.pos + w.toc.Postings = w.f.pos if err := w.writePostings(); err != nil { return err } - w.toc.LabelIndicesTable = w.pos + w.toc.LabelIndicesTable = w.f.pos if err := w.writeLabelIndexesOffsetTable(); err != nil { return err } - w.toc.PostingsTable = w.pos + w.toc.PostingsTable = w.f.pos if err := w.writePostingsOffsetTable(); err != nil { return err } @@ -354,8 +398,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta return errors.Errorf("failed to write padding bytes: %v", err) } - if w.pos%16 != 0 { - return errors.Errorf("series write not 16-byte aligned at %d", w.pos) + if w.f.pos%16 != 0 { + return errors.Errorf("series write not 16-byte aligned at %d", w.f.pos) } w.buf2.Reset() @@ -436,24 +480,24 @@ func (w *Writer) AddSymbol(sym string) error { func (w *Writer) finishSymbols() error { // Write out the length and symbol count. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - w.toc.Symbols - 4)) + w.buf1.PutBE32int(int(w.f.pos - w.toc.Symbols - 4)) w.buf1.PutBE32int(int(w.numSymbols)) if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil { return err } - hashPos := w.pos + hashPos := w.f.pos // Leave space for the hash. We can only calculate it // now that the number of symbols is known, so mmap and do it from there. if err := w.write([]byte("hash")); err != nil { return err } - if err := w.fbuf.Flush(); err != nil { + if err := w.f.flush(); err != nil { return err } var err error - w.symbolFile, err = fileutil.OpenMmapFile(w.f.Name()) + w.symbolFile, err = fileutil.OpenMmapFile(w.f.name) if err != nil { return err } @@ -493,10 +537,10 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{ keys: names, - offset: w.pos, + offset: w.f.pos, }) - startPos := w.pos + startPos := w.f.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err @@ -527,7 +571,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { // Write out the length. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) + w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } @@ -539,7 +583,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { // writeLabelIndexesOffsetTable writes the label indices offset table. func (w *Writer) writeLabelIndexesOffsetTable() error { - startPos := w.pos + startPos := w.f.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err @@ -567,7 +611,7 @@ func (w *Writer) writeLabelIndexesOffsetTable() error { } // Write out the length. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) + w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } @@ -580,12 +624,12 @@ func (w *Writer) writeLabelIndexesOffsetTable() error { // writePostingsOffsetTable writes the postings offset table. func (w *Writer) writePostingsOffsetTable() error { // Ensure everything is in the temporary file. - if err := w.fbufPO.Flush(); err != nil { + if err := w.fPO.flush(); err != nil { return err } w.buf1.Reset() - w.buf1.PutBE32int(int(w.posPO) + 4) // Length, including the count. + w.buf1.PutBE32int(int(w.fPO.pos) + 4) // Length, including the count. if err := w.write(w.buf1.Get()); err != nil { return err } @@ -598,14 +642,14 @@ func (w *Writer) writePostingsOffsetTable() error { return err } // Copy temporary file into main index. - if _, err := w.fPO.Seek(0, 0); err != nil { + if _, err := w.fPO.f.Seek(0, 0); err != nil { return err } buf := make([]byte, 1<<20) l := 0 for { - n, err := w.fPO.Read(buf) + n, err := w.fPO.f.Read(buf) if err != nil && err != io.EOF { return err } @@ -618,19 +662,18 @@ func (w *Writer) writePostingsOffsetTable() error { return err } } - if w.posPO != uint64(l) { - return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.posPO, l) + if w.fPO.pos != uint64(l) { + return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.fPO.pos, l) } // Cleanup temporary file. - name := w.fPO.Name() - if err := w.fPO.Close(); err != nil { + if err := w.fPO.close(); err != nil { + return err + } + if err := w.fPO.remove(); err != nil { return err } w.fPO = nil - if err := os.Remove(name); err != nil { - return err - } // Finally write the hash. w.buf1.Reset() @@ -662,10 +705,10 @@ func (w *Writer) writePostings() error { } sort.Strings(names) - if err := w.fbuf.Flush(); err != nil { + if err := w.f.flush(); err != nil { return err } - f, err := fileutil.OpenMmapFile(w.f.Name()) + f, err := fileutil.OpenMmapFile(w.f.name) if err != nil { return err } @@ -790,11 +833,9 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.buf1.PutUvarint(2) w.buf1.PutUvarintStr(name) w.buf1.PutUvarintStr(value) - w.buf1.PutUvarint64(w.pos) - if n, err := w.fbufPO.Write(w.buf1.Get()); err != nil { + w.buf1.PutUvarint64(w.f.pos) + if err := w.fPO.write(w.buf1.Get()); err != nil { return err - } else { - w.posPO += uint64(n) } w.cntPO++ @@ -840,17 +881,11 @@ func (w *Writer) Close() error { } } if w.fPO != nil { - if err := w.fPO.Close(); err != nil { + if err := w.fPO.close(); err != nil { return err } } - if err := w.fbuf.Flush(); err != nil { - return err - } - if err := w.f.Sync(); err != nil { - return err - } - return w.f.Close() + return w.f.close() } // StringTuples provides access to a sorted list of string tuples. From dee6981a6c78dfc28d000f86e6a6639cc37150b3 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Tue, 17 Dec 2019 22:15:35 +0000 Subject: [PATCH 3/5] Move writing of index label indices into IndexWriter. Now you only need to provide symbols and series to IndexWriter. Signed-off-by: Brian Brazil --- tsdb/block.go | 4 --- tsdb/compact.go | 27 +-------------- tsdb/index/index.go | 73 +++++++++++++++++++++------------------- tsdb/index/index_test.go | 26 -------------- 4 files changed, 39 insertions(+), 91 deletions(-) diff --git a/tsdb/block.go b/tsdb/block.go index c4976b04e..f794dbe6a 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -50,10 +50,6 @@ type IndexWriter interface { // are added later. AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error - // WriteLabelIndex serializes an index from label names to values. - // The passed in values chained tuples of strings of the length of names. - WriteLabelIndex(names []string, values []string) error - // Close writes any finalization and closes the resources associated with // the underlying writer. Close() error diff --git a/tsdb/compact.go b/tsdb/compact.go index a3dae8729..47151dad6 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -722,11 +722,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, symbols = newMergedStringIter(symbols, syms) } - var ( - values = map[string]stringset{} - ref = uint64(0) - ) - for symbols.Next() { if err := indexw.AddSymbol(symbols.At()); err != nil { return errors.Wrap(err, "add symbol") @@ -737,6 +732,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } delIter := &deletedIterator{} + ref := uint64(0) for set.Next() { select { case <-c.ctx.Done(): @@ -836,33 +832,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - for _, l := range lset { - valset, ok := values[l.Name] - if !ok { - valset = stringset{} - values[l.Name] = valset - } - valset.set(l.Value) - } - ref++ } if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set") } - s := make([]string, 0, 256) - for n, v := range values { - s = s[:0] - - for x := range v { - s = append(s, x) - } - if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { - return errors.Wrap(err, "write label index") - } - } - return nil } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index ed21417d2..1470c7776 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -70,8 +70,6 @@ const ( idxStageNone indexWriterStage = iota idxStageSymbols idxStageSeries - idxStageLabelIndex - idxStagePostings idxStageDone ) @@ -83,10 +81,6 @@ func (s indexWriterStage) String() string { return "symbols" case idxStageSeries: return "series" - case idxStageLabelIndex: - return "label index" - case idxStagePostings: - return "postings" case idxStageDone: return "done" } @@ -132,9 +126,9 @@ type Writer struct { symbolFile *fileutil.MmapFile lastSymbol string - labelIndexes []labelIndexHashEntry // label index offsets - postings []postingsHashEntry // postings lists offsets - labelNames map[string]uint64 // label names, and their usage + labelIndexes []labelIndexHashEntry // Label index offsets. + labelValues map[string]map[uint32]struct{} // Label names, and their values's symbol indexes. + labelNames map[string]uint64 // Label names, and their usage. // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels @@ -221,9 +215,9 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - // Caches. - labelNames: make(map[string]uint64, 1<<8), - crc32: newCRC32(), + labelNames: make(map[string]uint64, 1<<8), + labelValues: make(map[string]map[uint32]struct{}, 1<<8), + crc32: newCRC32(), } if err := iw.writeMeta(); err != nil { return nil, err @@ -346,10 +340,12 @@ func (w *Writer) ensureStage(s indexWriterStage) error { } w.toc.Series = w.f.pos - case idxStageLabelIndex: - w.toc.LabelIndices = w.f.pos - case idxStageDone: + w.toc.LabelIndices = w.f.pos + if err := w.writeLabelIndices(); err != nil { + return err + } + w.toc.Postings = w.f.pos if err := w.writePostings(); err != nil { return err @@ -419,6 +415,11 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err) } w.buf2.PutUvarint32(index) + + if _, ok := w.labelValues[l.Name]; !ok { + w.labelValues[l.Name] = map[uint32]struct{}{} + } + w.labelValues[l.Name][index] = struct{}{} } w.buf2.PutUvarint(len(chunks)) @@ -516,27 +517,34 @@ func (w *Writer) finishSymbols() error { return nil } -func (w *Writer) WriteLabelIndex(names []string, values []string) error { - if len(values)%len(names) != 0 { - return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) - } - if err := w.ensureStage(idxStageLabelIndex); err != nil { - return errors.Wrap(err, "ensure stage") +func (w *Writer) writeLabelIndices() error { + names := make([]string, 0, len(w.labelValues)) + for n := range w.labelValues { + names = append(names, n) } + sort.Strings(names) - valt, err := NewStringTuples(values, len(names)) - if err != nil { - return err + for _, n := range names { + values := make([]uint32, 0, len(w.labelValues[n])) + for v := range w.labelValues[n] { + values = append(values, v) + } + sort.Sort(uint32slice(values)) + if err := w.writeLabelIndex(n, values); err != nil { + return err + } } - sort.Sort(valt) + return nil +} +func (w *Writer) writeLabelIndex(name string, values []uint32) error { // Align beginning to 4 bytes for more efficient index list scans. if err := w.addPadding(4); err != nil { return err } w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{ - keys: names, + keys: []string{name}, offset: w.f.pos, }) @@ -548,21 +556,16 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { w.crc32.Reset() w.buf1.Reset() - w.buf1.PutBE32int(len(names)) - w.buf1.PutBE32int(valt.Len()) + w.buf1.PutBE32int(1) // Number of names. + w.buf1.PutBE32int(len(values)) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } - // here we have an index for the symbol file if v2, otherwise it's an offset - for _, v := range valt.entries { - sid, err := w.symbols.ReverseLookup(v) - if err != nil { - return errors.Errorf("symbol entry for %q does not exist: %v", v, err) - } + for _, v := range values { w.buf1.Reset() - w.buf1.PutBE32(sid) + w.buf1.PutBE32(v) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 07f8f8a47..a8bf59112 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -83,16 +83,6 @@ func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) return nil } -func (m mockIndex) WriteLabelIndex(names []string, values []string) error { - // TODO support composite indexes - if len(names) != 1 { - return errors.New("composite indexes not supported yet") - } - sort.Strings(values) - m.labelIndex[names[0]] = values - return nil -} - func (m mockIndex) Close() error { return nil } @@ -200,9 +190,6 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, iw.AddSeries(3, series[2])) testutil.Ok(t, iw.AddSeries(4, series[3])) - testutil.Ok(t, iw.WriteLabelIndex([]string{"a"}, []string{"1"})) - testutil.Ok(t, iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"})) - testutil.Ok(t, iw.Close()) ir, err := NewFileReader(fn) @@ -289,8 +276,6 @@ func TestPostingsMany(t *testing.T) { for i, s := range series { testutil.Ok(t, iw.AddSeries(uint64(i), s)) } - err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"}) - testutil.Ok(t, err) testutil.Ok(t, iw.Close()) ir, err := NewFileReader(fn) @@ -427,17 +412,6 @@ func TestPersistence_index_e2e(t *testing.T) { postings.Add(uint64(i), s.labels) } - for k, v := range values { - var vals []string - for e := range v { - vals = append(vals, e) - } - sort.Strings(vals) - - testutil.Ok(t, iw.WriteLabelIndex([]string{k}, vals)) - testutil.Ok(t, mi.WriteLabelIndex([]string{k}, vals)) - } - err = iw.Close() testutil.Ok(t, err) From 7d1aad46b8af9d6128f0777d9ab9477573dbd09f Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 18 Dec 2019 00:55:29 +0000 Subject: [PATCH 4/5] Put postings in a temporary file during index writing. Signed-off-by: Brian Brazil --- tsdb/index/index.go | 118 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 91 insertions(+), 27 deletions(-) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 1470c7776..06c7e2563 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -110,6 +110,8 @@ type Writer struct { // For the main index file. f *fileWriter + // Temporary file for postings. + fP *fileWriter // Temporary file for posting offsets table. fPO *fileWriter cntPO uint64 @@ -196,6 +198,11 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { if err != nil { return nil, err } + // Temporary file for postings. + fP, err := newFileWriter(fn + "_tmp_p") + if err != nil { + return nil, err + } // Temporary file for posting offset table. fPO, err := newFileWriter(fn + "_tmp_po") if err != nil { @@ -208,6 +215,7 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { iw := &Writer{ ctx: ctx, f: f, + fP: fP, fPO: fPO, stage: idxStageNone, @@ -323,6 +331,10 @@ func (w *Writer) ensureStage(s indexWriterStage) error { if w.stage == s { return nil } + if w.stage+1 < s { + // A stage has been skipped. + w.ensureStage(s - 1) + } if w.stage > s { return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) } @@ -342,6 +354,11 @@ func (w *Writer) ensureStage(s indexWriterStage) error { case idxStageDone: w.toc.LabelIndices = w.f.pos + // LabelIndices generation depends on the posting offset + // table produced at this stage. + if err := w.writePostingsToTmpFiles(); err != nil { + return err + } if err := w.writeLabelIndices(); err != nil { return err } @@ -355,6 +372,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { if err := w.writeLabelIndexesOffsetTable(); err != nil { return err } + w.toc.PostingsTable = w.f.pos if err := w.writePostingsOffsetTable(); err != nil { return err @@ -631,12 +649,16 @@ func (w *Writer) writePostingsOffsetTable() error { return err } - w.buf1.Reset() - w.buf1.PutBE32int(int(w.fPO.pos) + 4) // Length, including the count. - if err := w.write(w.buf1.Get()); err != nil { + startPos := w.f.pos + // Leave 4 bytes of space for the length, which will be calculated later. + if err := w.write([]byte("alen")); err != nil { return err } + // Copy over the tmp posting offset table, however we need to + // adjust the offsets. + adjustment := w.toc.Postings + w.buf1.Reset() w.crc32.Reset() w.buf1.PutBE32int(int(w.cntPO)) // Count. @@ -644,29 +666,28 @@ func (w *Writer) writePostingsOffsetTable() error { if err := w.write(w.buf1.Get()); err != nil { return err } - // Copy temporary file into main index. - if _, err := w.fPO.f.Seek(0, 0); err != nil { + + f, err := fileutil.OpenMmapFile(w.fPO.name) + if err != nil { return err } - - buf := make([]byte, 1<<20) - l := 0 - for { - n, err := w.fPO.f.Read(buf) - if err != nil && err != io.EOF { - return err - } - if n == 0 { - break - } - l += n - w.crc32.Write(buf[:n]) - if err := w.write(buf[:n]); err != nil { + defer f.Close() + d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos)) + cnt := w.cntPO + for d.Err() == nil && cnt > 0 { + w.buf1.Reset() + w.buf1.PutUvarint(d.Uvarint()) // Keycount. + w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label name. + w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label value. + w.buf1.PutUvarint64(d.Uvarint64() + adjustment) // Offset. + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { return err } + cnt-- } - if w.fPO.pos != uint64(l) { - return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.fPO.pos, l) + if d.Err() != nil { + return d.Err() } // Cleanup temporary file. @@ -678,6 +699,13 @@ func (w *Writer) writePostingsOffsetTable() error { } w.fPO = nil + // Write out the length. + w.buf1.Reset() + w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) + if err := w.writeAt(w.buf1.Get(), startPos); err != nil { + return err + } + // Finally write the hash. w.buf1.Reset() w.buf1.PutHashSum(w.crc32) @@ -701,7 +729,7 @@ func (w *Writer) writeTOC() error { return w.write(w.buf1.Get()) } -func (w *Writer) writePostings() error { +func (w *Writer) writePostingsToTmpFiles() error { names := make([]string, 0, len(w.labelNames)) for n := range w.labelNames { names = append(names, n) @@ -729,9 +757,10 @@ func (w *Writer) writePostings() error { } offsets = append(offsets, uint32(startPos/16)) // Skip to next series. - d.Skip(d.Uvarint() + crc32.Size) + x := d.Uvarint() + d.Skip(x + crc32.Size) if err := d.Err(); err != nil { - return nil + return err } } if err := w.writePosting("", "", offsets); err != nil { @@ -827,7 +856,7 @@ func (w *Writer) writePostings() error { func (w *Writer) writePosting(name, value string, offs []uint32) error { // Align beginning to 4 bytes for more efficient postings list scans. - if err := w.addPadding(4); err != nil { + if err := w.fP.addPadding(4); err != nil { return err } @@ -836,7 +865,7 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.buf1.PutUvarint(2) w.buf1.PutUvarintStr(name) w.buf1.PutUvarintStr(value) - w.buf1.PutUvarint64(w.f.pos) + w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file. if err := w.fPO.write(w.buf1.Get()); err != nil { return err } @@ -855,7 +884,37 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.buf2.Reset() w.buf2.PutBE32int(w.buf1.Len()) w.buf1.PutHash(w.crc32) - return w.write(w.buf2.Get(), w.buf1.Get()) + return w.fP.write(w.buf2.Get(), w.buf1.Get()) +} + +func (w *Writer) writePostings() error { + // There's padding in the tmp filem make sure it actually works. + if err := w.f.addPadding(4); err != nil { + return err + } + + // Copy temporary file into main index. + if err := w.fP.flush(); err != nil { + return err + } + if _, err := w.fP.f.Seek(0, 0); err != nil { + return err + } + // Don't need to calculate a checksum, so can copy directly. + n, err := io.CopyBuffer(w.f.fbuf, w.fP.f, make([]byte, 1<<20)) + if err != nil { + return err + } + if uint64(n) != w.fP.pos { + return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n) + } + w.f.pos += uint64(n) + + if err := w.fP.remove(); err != nil { + return err + } + w.fP = nil + return nil } type uint32slice []uint32 @@ -883,6 +942,11 @@ func (w *Writer) Close() error { return err } } + if w.fP != nil { + if err := w.fP.close(); err != nil { + return err + } + } if w.fPO != nil { if err := w.fPO.close(); err != nil { return err From 2b653ee2301c2658f8ffec0160c0dc2ad92a9441 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 18 Dec 2019 01:29:41 +0000 Subject: [PATCH 5/5] Write label indices based on the posting offset table. This avoids having to build it up in RAM, and means that all variable memory usage for compactions is now 0.25 bytes per symbol plus a few O(labelnames) structures. So in practice, pretty close to constant memory for compactions. benchmark old ns/op new ns/op delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 662974828 667162981 +0.63% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2459590377 2131168138 -13.35% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3808280548 3919290378 +2.91% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8513884311 8738099339 +2.63% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1898843003 1944131966 +2.39% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5601478437 6031391658 +7.67% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 11225096097 11359624463 +1.20% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 23994637282 23919583343 -0.31% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 891042098 826898358 -7.20% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 915949138 902555676 -1.46% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 955138431 879067946 -7.96% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 991447640 958785968 -3.29% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 1068729356 980249080 -8.28% benchmark old allocs new allocs delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 470778 470556 -0.05% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 791429 791225 -0.03% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1111514 1111257 -0.02% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2111498 2111369 -0.01% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 841433 841220 -0.03% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1911469 1911202 -0.01% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3041558 3041328 -0.01% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6741534 6741382 -0.00% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 824856 820873 -0.48% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 887220 885180 -0.23% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 905253 901539 -0.41% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 925148 913632 -1.24% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 1019141 978727 -3.97% benchmark old bytes new bytes delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 35694744 41523836 +16.33% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 53405264 59499056 +11.41% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 74160320 78151568 +5.38% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 120878480 135364672 +11.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 203832448 209925504 +2.99% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 341029208 346551064 +1.62% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 580217176 582345224 +0.37% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1356872288 1363495368 +0.49% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 119535672 94815920 -20.68% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 115352280 95980776 -16.79% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 119472320 98724460 -17.37% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 111979312 94325456 -15.77% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 116628584 98566344 -15.49% Signed-off-by: Brian Brazil --- tsdb/index/index.go | 96 +++++++++++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 34 deletions(-) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 06c7e2563..700c5da7e 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -15,6 +15,7 @@ package index import ( "bufio" + "bytes" "context" "encoding/binary" "hash" @@ -116,8 +117,9 @@ type Writer struct { fPO *fileWriter cntPO uint64 - toc TOC - stage indexWriterStage + toc TOC + stage indexWriterStage + postingsStart uint64 // Due to padding, can differ from TOC entry. // Reusable memory. buf1 encoding.Encbuf @@ -128,9 +130,8 @@ type Writer struct { symbolFile *fileutil.MmapFile lastSymbol string - labelIndexes []labelIndexHashEntry // Label index offsets. - labelValues map[string]map[uint32]struct{} // Label names, and their values's symbol indexes. - labelNames map[string]uint64 // Label names, and their usage. + labelIndexes []labelIndexHashEntry // Label index offsets. + labelNames map[string]uint64 // Label names, and their usage. // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels @@ -223,9 +224,8 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - labelNames: make(map[string]uint64, 1<<8), - labelValues: make(map[string]map[uint32]struct{}, 1<<8), - crc32: newCRC32(), + labelNames: make(map[string]uint64, 1<<8), + crc32: newCRC32(), } if err := iw.writeMeta(); err != nil { return nil, err @@ -277,7 +277,7 @@ func (fw *fileWriter) write(bufs ...[]byte) error { // Once we move to compressed/varint representations in those areas, this limitation // can be lifted. if fw.pos > 16*math.MaxUint32 { - return errors.Errorf("exceeding max size of 64GiB") + return errors.Errorf("%q exceeding max size of 64GiB", fw.name) } } return nil @@ -331,9 +331,11 @@ func (w *Writer) ensureStage(s indexWriterStage) error { if w.stage == s { return nil } - if w.stage+1 < s { + if w.stage < s-1 { // A stage has been skipped. - w.ensureStage(s - 1) + if err := w.ensureStage(s - 1); err != nil { + return err + } } if w.stage > s { return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) @@ -420,7 +422,6 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta w.buf2.PutUvarint(len(lset)) for _, l := range lset { - // here we have an index for the symbol file if v2, otherwise it's an offset index, err := w.symbols.ReverseLookup(l.Name) if err != nil { return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err) @@ -433,11 +434,6 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err) } w.buf2.PutUvarint32(index) - - if _, ok := w.labelValues[l.Name]; !ok { - w.labelValues[l.Name] = map[uint32]struct{}{} - } - w.labelValues[l.Name][index] = struct{}{} } w.buf2.PutUvarint(len(chunks)) @@ -536,19 +532,52 @@ func (w *Writer) finishSymbols() error { } func (w *Writer) writeLabelIndices() error { - names := make([]string, 0, len(w.labelValues)) - for n := range w.labelValues { - names = append(names, n) + if err := w.fPO.flush(); err != nil { + return err } - sort.Strings(names) - for _, n := range names { - values := make([]uint32, 0, len(w.labelValues[n])) - for v := range w.labelValues[n] { - values = append(values, v) + // Find all the label values in the tmp posting offset table. + f, err := fileutil.OpenMmapFile(w.fPO.name) + if err != nil { + return err + } + defer f.Close() + + d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos)) + cnt := w.cntPO + current := []byte{} + values := []uint32{} + for d.Err() == nil && cnt > 0 { + cnt-- + d.Uvarint() // Keycount. + name := d.UvarintBytes() // Label name. + value := yoloString(d.UvarintBytes()) // Label value. + d.Uvarint64() // Offset. + if len(name) == 0 { + continue // All index is ignored. } - sort.Sort(uint32slice(values)) - if err := w.writeLabelIndex(n, values); err != nil { + + if !bytes.Equal(name, current) && len(values) > 0 { + // We've reached a new label name. + if err := w.writeLabelIndex(string(current), values); err != nil { + return err + } + values = values[:0] + } + current = name + sid, err := w.symbols.ReverseLookup(value) + if err != nil { + return err + } + values = append(values, sid) + } + if d.Err() != nil { + return d.Err() + } + + // Handle the last label. + if len(values) > 0 { + if err := w.writeLabelIndex(string(current), values); err != nil { return err } } @@ -657,7 +686,7 @@ func (w *Writer) writePostingsOffsetTable() error { // Copy over the tmp posting offset table, however we need to // adjust the offsets. - adjustment := w.toc.Postings + adjustment := w.postingsStart w.buf1.Reset() w.crc32.Reset() @@ -888,10 +917,11 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { } func (w *Writer) writePostings() error { - // There's padding in the tmp filem make sure it actually works. + // There's padding in the tmp file, make sure it actually works. if err := w.f.addPadding(4); err != nil { return err } + w.postingsStart = w.f.pos // Copy temporary file into main index. if err := w.fP.flush(); err != nil { @@ -910,6 +940,9 @@ func (w *Writer) writePostings() error { } w.f.pos += uint64(n) + if err := w.fP.close(); err != nil { + return err + } if err := w.fP.remove(); err != nil { return err } @@ -928,11 +961,6 @@ type labelIndexHashEntry struct { offset uint64 } -type postingsHashEntry struct { - name, value string - offset uint64 -} - func (w *Writer) Close() error { if err := w.ensureStage(idxStageDone); err != nil { return err