diff --git a/tsdb/block.go b/tsdb/block.go index c4976b04e..f794dbe6a 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -50,10 +50,6 @@ type IndexWriter interface { // are added later. AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error - // WriteLabelIndex serializes an index from label names to values. - // The passed in values chained tuples of strings of the length of names. - WriteLabelIndex(names []string, values []string) error - // Close writes any finalization and closes the resources associated with // the underlying writer. Close() error diff --git a/tsdb/compact.go b/tsdb/compact.go index a3dae8729..47151dad6 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -722,11 +722,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, symbols = newMergedStringIter(symbols, syms) } - var ( - values = map[string]stringset{} - ref = uint64(0) - ) - for symbols.Next() { if err := indexw.AddSymbol(symbols.At()); err != nil { return errors.Wrap(err, "add symbol") @@ -737,6 +732,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } delIter := &deletedIterator{} + ref := uint64(0) for set.Next() { select { case <-c.ctx.Done(): @@ -836,33 +832,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - for _, l := range lset { - valset, ok := values[l.Name] - if !ok { - valset = stringset{} - values[l.Name] = valset - } - valset.set(l.Value) - } - ref++ } if set.Err() != nil { return errors.Wrap(set.Err(), "iterate compaction set") } - s := make([]string, 0, 256) - for n, v := range values { - s = s[:0] - - for x := range v { - s = append(s, x) - } - if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { - return errors.Wrap(err, "write label index") - } - } - return nil } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index fb68c71a2..700c5da7e 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -15,6 +15,7 @@ package index import ( "bufio" + "bytes" "context" "encoding/binary" "hash" @@ -70,8 +71,6 @@ const ( idxStageNone indexWriterStage = iota idxStageSymbols idxStageSeries - idxStageLabelIndex - idxStagePostings idxStageDone ) @@ -83,10 +82,6 @@ func (s indexWriterStage) String() string { return "symbols" case idxStageSeries: return "series" - case idxStageLabelIndex: - return "label index" - case idxStagePostings: - return "postings" case idxStageDone: return "done" } @@ -111,13 +106,20 @@ func newCRC32() hash.Hash32 { // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { - ctx context.Context - f *os.File - fbuf *bufio.Writer - pos uint64 + ctx context.Context - toc TOC - stage indexWriterStage + // For the main index file. + f *fileWriter + + // Temporary file for postings. + fP *fileWriter + // Temporary file for posting offsets table. + fPO *fileWriter + cntPO uint64 + + toc TOC + stage indexWriterStage + postingsStart uint64 // Due to padding, can differ from TOC entry. // Reusable memory. buf1 encoding.Encbuf @@ -128,9 +130,8 @@ type Writer struct { symbolFile *fileutil.MmapFile lastSymbol string - labelIndexes []labelIndexHashEntry // label index offsets - postings []postingsHashEntry // postings lists offsets - labelNames map[string]uint64 // label names, and their usage + labelIndexes []labelIndexHashEntry // Label index offsets. + labelNames map[string]uint64 // Label names, and their usage. // Hold last series to validate that clients insert new series in order. lastSeries labels.Labels @@ -193,7 +194,18 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { return nil, errors.Wrap(err, "remove any existing index at path") } - f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, 0666) + // Main index file we are building. + f, err := newFileWriter(fn) + if err != nil { + return nil, err + } + // Temporary file for postings. + fP, err := newFileWriter(fn + "_tmp_p") + if err != nil { + return nil, err + } + // Temporary file for posting offset table. + fPO, err := newFileWriter(fn + "_tmp_po") if err != nil { return nil, err } @@ -204,15 +216,14 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { iw := &Writer{ ctx: ctx, f: f, - fbuf: bufio.NewWriterSize(f, 1<<22), - pos: 0, + fP: fP, + fPO: fPO, stage: idxStageNone, // Reusable memory. buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - // Caches. labelNames: make(map[string]uint64, 1<<8), crc32: newCRC32(), } @@ -223,9 +234,41 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } func (w *Writer) write(bufs ...[]byte) error { + return w.f.write(bufs...) +} + +func (w *Writer) writeAt(buf []byte, pos uint64) error { + return w.f.writeAt(buf, pos) +} + +func (w *Writer) addPadding(size int) error { + return w.f.addPadding(size) +} + +type fileWriter struct { + f *os.File + fbuf *bufio.Writer + pos uint64 + name string +} + +func newFileWriter(name string) (*fileWriter, error) { + f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + return &fileWriter{ + f: f, + fbuf: bufio.NewWriterSize(f, 1<<22), + pos: 0, + name: name, + }, nil +} + +func (fw *fileWriter) write(bufs ...[]byte) error { for _, b := range bufs { - n, err := w.fbuf.Write(b) - w.pos += uint64(n) + n, err := fw.fbuf.Write(b) + fw.pos += uint64(n) if err != nil { return err } @@ -233,29 +276,47 @@ func (w *Writer) write(bufs ...[]byte) error { // offset references in v1 are only 4 bytes large. // Once we move to compressed/varint representations in those areas, this limitation // can be lifted. - if w.pos > 16*math.MaxUint32 { - return errors.Errorf("exceeding max size of 64GiB") + if fw.pos > 16*math.MaxUint32 { + return errors.Errorf("%q exceeding max size of 64GiB", fw.name) } } return nil } -func (w *Writer) writeAt(buf []byte, pos uint64) error { - if err := w.fbuf.Flush(); err != nil { +func (fw *fileWriter) flush() error { + return fw.fbuf.Flush() +} + +func (fw *fileWriter) writeAt(buf []byte, pos uint64) error { + if err := fw.flush(); err != nil { return err } - _, err := w.f.WriteAt(buf, int64(pos)) + _, err := fw.f.WriteAt(buf, int64(pos)) return err } // addPadding adds zero byte padding until the file size is a multiple size. -func (w *Writer) addPadding(size int) error { - p := w.pos % uint64(size) +func (fw *fileWriter) addPadding(size int) error { + p := fw.pos % uint64(size) if p == 0 { return nil } p = uint64(size) - p - return errors.Wrap(w.write(make([]byte, p)), "add padding") + return errors.Wrap(fw.write(make([]byte, p)), "add padding") +} + +func (fw *fileWriter) close() error { + if err := fw.flush(); err != nil { + return err + } + if err := fw.f.Sync(); err != nil { + return err + } + return fw.f.Close() +} + +func (fw *fileWriter) remove() error { + return os.Remove(fw.name) } // ensureStage handles transitions between write stages and ensures that IndexWriter @@ -270,6 +331,12 @@ func (w *Writer) ensureStage(s indexWriterStage) error { if w.stage == s { return nil } + if w.stage < s-1 { + // A stage has been skipped. + if err := w.ensureStage(s - 1); err != nil { + return err + } + } if w.stage > s { return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) } @@ -277,7 +344,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { // Mark start of sections in table of contents. switch s { case idxStageSymbols: - w.toc.Symbols = w.pos + w.toc.Symbols = w.f.pos if err := w.startSymbols(); err != nil { return err } @@ -285,22 +352,30 @@ func (w *Writer) ensureStage(s indexWriterStage) error { if err := w.finishSymbols(); err != nil { return err } - w.toc.Series = w.pos - - case idxStageLabelIndex: - w.toc.LabelIndices = w.pos + w.toc.Series = w.f.pos case idxStageDone: - w.toc.Postings = w.pos + w.toc.LabelIndices = w.f.pos + // LabelIndices generation depends on the posting offset + // table produced at this stage. + if err := w.writePostingsToTmpFiles(); err != nil { + return err + } + if err := w.writeLabelIndices(); err != nil { + return err + } + + w.toc.Postings = w.f.pos if err := w.writePostings(); err != nil { return err } - w.toc.LabelIndicesTable = w.pos + w.toc.LabelIndicesTable = w.f.pos if err := w.writeLabelIndexesOffsetTable(); err != nil { return err } - w.toc.PostingsTable = w.pos + + w.toc.PostingsTable = w.f.pos if err := w.writePostingsOffsetTable(); err != nil { return err } @@ -339,15 +414,14 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta return errors.Errorf("failed to write padding bytes: %v", err) } - if w.pos%16 != 0 { - return errors.Errorf("series write not 16-byte aligned at %d", w.pos) + if w.f.pos%16 != 0 { + return errors.Errorf("series write not 16-byte aligned at %d", w.f.pos) } w.buf2.Reset() w.buf2.PutUvarint(len(lset)) for _, l := range lset { - // here we have an index for the symbol file if v2, otherwise it's an offset index, err := w.symbols.ReverseLookup(l.Name) if err != nil { return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err) @@ -421,24 +495,24 @@ func (w *Writer) AddSymbol(sym string) error { func (w *Writer) finishSymbols() error { // Write out the length and symbol count. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - w.toc.Symbols - 4)) + w.buf1.PutBE32int(int(w.f.pos - w.toc.Symbols - 4)) w.buf1.PutBE32int(int(w.numSymbols)) if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil { return err } - hashPos := w.pos + hashPos := w.f.pos // Leave space for the hash. We can only calculate it // now that the number of symbols is known, so mmap and do it from there. if err := w.write([]byte("hash")); err != nil { return err } - if err := w.fbuf.Flush(); err != nil { + if err := w.f.flush(); err != nil { return err } var err error - w.symbolFile, err = fileutil.OpenMmapFile(w.f.Name()) + w.symbolFile, err = fileutil.OpenMmapFile(w.f.name) if err != nil { return err } @@ -457,31 +531,71 @@ func (w *Writer) finishSymbols() error { return nil } -func (w *Writer) WriteLabelIndex(names []string, values []string) error { - if len(values)%len(names) != 0 { - return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) - } - if err := w.ensureStage(idxStageLabelIndex); err != nil { - return errors.Wrap(err, "ensure stage") +func (w *Writer) writeLabelIndices() error { + if err := w.fPO.flush(); err != nil { + return err } - valt, err := NewStringTuples(values, len(names)) + // Find all the label values in the tmp posting offset table. + f, err := fileutil.OpenMmapFile(w.fPO.name) if err != nil { return err } - sort.Sort(valt) + defer f.Close() + d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos)) + cnt := w.cntPO + current := []byte{} + values := []uint32{} + for d.Err() == nil && cnt > 0 { + cnt-- + d.Uvarint() // Keycount. + name := d.UvarintBytes() // Label name. + value := yoloString(d.UvarintBytes()) // Label value. + d.Uvarint64() // Offset. + if len(name) == 0 { + continue // All index is ignored. + } + + if !bytes.Equal(name, current) && len(values) > 0 { + // We've reached a new label name. + if err := w.writeLabelIndex(string(current), values); err != nil { + return err + } + values = values[:0] + } + current = name + sid, err := w.symbols.ReverseLookup(value) + if err != nil { + return err + } + values = append(values, sid) + } + if d.Err() != nil { + return d.Err() + } + + // Handle the last label. + if len(values) > 0 { + if err := w.writeLabelIndex(string(current), values); err != nil { + return err + } + } + return nil +} + +func (w *Writer) writeLabelIndex(name string, values []uint32) error { // Align beginning to 4 bytes for more efficient index list scans. if err := w.addPadding(4); err != nil { return err } w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{ - keys: names, - offset: w.pos, + keys: []string{name}, + offset: w.f.pos, }) - startPos := w.pos + startPos := w.f.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err @@ -489,21 +603,16 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { w.crc32.Reset() w.buf1.Reset() - w.buf1.PutBE32int(len(names)) - w.buf1.PutBE32int(valt.Len()) + w.buf1.PutBE32int(1) // Number of names. + w.buf1.PutBE32int(len(values)) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } - // here we have an index for the symbol file if v2, otherwise it's an offset - for _, v := range valt.entries { - sid, err := w.symbols.ReverseLookup(v) - if err != nil { - return errors.Errorf("symbol entry for %q does not exist: %v", v, err) - } + for _, v := range values { w.buf1.Reset() - w.buf1.PutBE32(sid) + w.buf1.PutBE32(v) w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err @@ -512,7 +621,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { // Write out the length. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) + w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } @@ -524,7 +633,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { // writeLabelIndexesOffsetTable writes the label indices offset table. func (w *Writer) writeLabelIndexesOffsetTable() error { - startPos := w.pos + startPos := w.f.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err @@ -552,7 +661,7 @@ func (w *Writer) writeLabelIndexesOffsetTable() error { } // Write out the length. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) + w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } @@ -564,39 +673,69 @@ func (w *Writer) writeLabelIndexesOffsetTable() error { // writePostingsOffsetTable writes the postings offset table. func (w *Writer) writePostingsOffsetTable() error { - startPos := w.pos + // Ensure everything is in the temporary file. + if err := w.fPO.flush(); err != nil { + return err + } + + startPos := w.f.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err } - w.crc32.Reset() + + // Copy over the tmp posting offset table, however we need to + // adjust the offsets. + adjustment := w.postingsStart w.buf1.Reset() - w.buf1.PutBE32int(len(w.postings)) + w.crc32.Reset() + w.buf1.PutBE32int(int(w.cntPO)) // Count. w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } - for _, e := range w.postings { + f, err := fileutil.OpenMmapFile(w.fPO.name) + if err != nil { + return err + } + defer f.Close() + d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos)) + cnt := w.cntPO + for d.Err() == nil && cnt > 0 { w.buf1.Reset() - w.buf1.PutUvarint(2) - w.buf1.PutUvarintStr(e.name) - w.buf1.PutUvarintStr(e.value) - w.buf1.PutUvarint64(e.offset) + w.buf1.PutUvarint(d.Uvarint()) // Keycount. + w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label name. + w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label value. + w.buf1.PutUvarint64(d.Uvarint64() + adjustment) // Offset. w.buf1.WriteToHash(w.crc32) if err := w.write(w.buf1.Get()); err != nil { return err } + cnt-- } + if d.Err() != nil { + return d.Err() + } + + // Cleanup temporary file. + if err := w.fPO.close(); err != nil { + return err + } + if err := w.fPO.remove(); err != nil { + return err + } + w.fPO = nil // Write out the length. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) + w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } + // Finally write the hash. w.buf1.Reset() w.buf1.PutHashSum(w.crc32) return w.write(w.buf1.Get()) @@ -619,17 +758,17 @@ func (w *Writer) writeTOC() error { return w.write(w.buf1.Get()) } -func (w *Writer) writePostings() error { +func (w *Writer) writePostingsToTmpFiles() error { names := make([]string, 0, len(w.labelNames)) for n := range w.labelNames { names = append(names, n) } sort.Strings(names) - if err := w.fbuf.Flush(); err != nil { + if err := w.f.flush(); err != nil { return err } - f, err := fileutil.OpenMmapFile(w.f.Name()) + f, err := fileutil.OpenMmapFile(w.f.name) if err != nil { return err } @@ -647,9 +786,10 @@ func (w *Writer) writePostings() error { } offsets = append(offsets, uint32(startPos/16)) // Skip to next series. - d.Skip(d.Uvarint() + crc32.Size) + x := d.Uvarint() + d.Skip(x + crc32.Size) if err := d.Err(); err != nil { - return nil + return err } } if err := w.writePosting("", "", offsets); err != nil { @@ -745,15 +885,20 @@ func (w *Writer) writePostings() error { func (w *Writer) writePosting(name, value string, offs []uint32) error { // Align beginning to 4 bytes for more efficient postings list scans. - if err := w.addPadding(4); err != nil { + if err := w.fP.addPadding(4); err != nil { return err } - w.postings = append(w.postings, postingsHashEntry{ - name: name, - value: value, - offset: w.pos, - }) + // Write out postings offset table to temporary file as we go. + w.buf1.Reset() + w.buf1.PutUvarint(2) + w.buf1.PutUvarintStr(name) + w.buf1.PutUvarintStr(value) + w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file. + if err := w.fPO.write(w.buf1.Get()); err != nil { + return err + } + w.cntPO++ w.buf1.Reset() w.buf1.PutBE32int(len(offs)) @@ -768,7 +913,41 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.buf2.Reset() w.buf2.PutBE32int(w.buf1.Len()) w.buf1.PutHash(w.crc32) - return w.write(w.buf2.Get(), w.buf1.Get()) + return w.fP.write(w.buf2.Get(), w.buf1.Get()) +} + +func (w *Writer) writePostings() error { + // There's padding in the tmp file, make sure it actually works. + if err := w.f.addPadding(4); err != nil { + return err + } + w.postingsStart = w.f.pos + + // Copy temporary file into main index. + if err := w.fP.flush(); err != nil { + return err + } + if _, err := w.fP.f.Seek(0, 0); err != nil { + return err + } + // Don't need to calculate a checksum, so can copy directly. + n, err := io.CopyBuffer(w.f.fbuf, w.fP.f, make([]byte, 1<<20)) + if err != nil { + return err + } + if uint64(n) != w.fP.pos { + return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n) + } + w.f.pos += uint64(n) + + if err := w.fP.close(); err != nil { + return err + } + if err := w.fP.remove(); err != nil { + return err + } + w.fP = nil + return nil } type uint32slice []uint32 @@ -782,11 +961,6 @@ type labelIndexHashEntry struct { offset uint64 } -type postingsHashEntry struct { - name, value string - offset uint64 -} - func (w *Writer) Close() error { if err := w.ensureStage(idxStageDone); err != nil { return err @@ -796,13 +970,17 @@ func (w *Writer) Close() error { return err } } - if err := w.fbuf.Flush(); err != nil { - return err + if w.fP != nil { + if err := w.fP.close(); err != nil { + return err + } } - if err := w.f.Sync(); err != nil { - return err + if w.fPO != nil { + if err := w.fPO.close(); err != nil { + return err + } } - return w.f.Close() + return w.f.close() } // StringTuples provides access to a sorted list of string tuples. diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 07f8f8a47..a8bf59112 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -83,16 +83,6 @@ func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) return nil } -func (m mockIndex) WriteLabelIndex(names []string, values []string) error { - // TODO support composite indexes - if len(names) != 1 { - return errors.New("composite indexes not supported yet") - } - sort.Strings(values) - m.labelIndex[names[0]] = values - return nil -} - func (m mockIndex) Close() error { return nil } @@ -200,9 +190,6 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, iw.AddSeries(3, series[2])) testutil.Ok(t, iw.AddSeries(4, series[3])) - testutil.Ok(t, iw.WriteLabelIndex([]string{"a"}, []string{"1"})) - testutil.Ok(t, iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"})) - testutil.Ok(t, iw.Close()) ir, err := NewFileReader(fn) @@ -289,8 +276,6 @@ func TestPostingsMany(t *testing.T) { for i, s := range series { testutil.Ok(t, iw.AddSeries(uint64(i), s)) } - err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"}) - testutil.Ok(t, err) testutil.Ok(t, iw.Close()) ir, err := NewFileReader(fn) @@ -427,17 +412,6 @@ func TestPersistence_index_e2e(t *testing.T) { postings.Add(uint64(i), s.labels) } - for k, v := range values { - var vals []string - for e := range v { - vals = append(vals, e) - } - sort.Strings(vals) - - testutil.Ok(t, iw.WriteLabelIndex([]string{k}, vals)) - testutil.Ok(t, mi.WriteLabelIndex([]string{k}, vals)) - } - err = iw.Close() testutil.Ok(t, err)