diff --git a/Documentation/format/index.md b/Documentation/format/index.md index 5fbdb176e..606de597a 100644 --- a/Documentation/format/index.md +++ b/Documentation/format/index.md @@ -18,14 +18,14 @@ It is terminated by a table of contents which serves as an entry point into the │ ├──────────────────────────────────────────────┤ │ │ │ Label Index N │ │ │ ├──────────────────────────────────────────────┤ │ -│ │ Label Index Table │ │ -│ ├──────────────────────────────────────────────┤ │ │ │ Postings 1 │ │ │ ├──────────────────────────────────────────────┤ │ │ │ ... │ │ │ ├──────────────────────────────────────────────┤ │ │ │ Postings N │ │ │ ├──────────────────────────────────────────────┤ │ +│ │ Label Index Table │ │ +│ ├──────────────────────────────────────────────┤ │ │ │ Postings Table │ │ │ ├──────────────────────────────────────────────┤ │ │ │ TOC │ │ diff --git a/block.go b/block.go index 232e64e67..90915a1f0 100644 --- a/block.go +++ b/block.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/tsdb/labels" ) +// DiskBlock represents a data block backed by on-disk data. type DiskBlock interface { BlockReader @@ -42,6 +43,7 @@ type DiskBlock interface { Close() error } +// BlockReader provides reading access to a data block. type BlockReader interface { // Index returns an IndexReader over the block's data. Index() IndexReader @@ -53,11 +55,6 @@ type BlockReader interface { Tombstones() TombstoneReader } -// Snapshottable defines an entity that can be backedup online. -type Snapshottable interface { - Snapshot(dir string) error -} - // Appendable defines an entity to which data can be appended. type Appendable interface { // Appender returns a new Appender against an underlying store. diff --git a/encoding_helpers.go b/encoding_helpers.go index 17c3ff081..9aa4ba409 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -77,22 +77,6 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) } -// uvarintTempStr decodes like uvarintStr but the returned string is -// not safe to use if the underyling buffer changes. -func (d *decbuf) uvarintTempStr() string { - l := d.uvarint64() - if d.e != nil { - return "" - } - if len(d.b) < int(l) { - d.e = errInvalidSize - return "" - } - s := yoloString(d.b[:l]) - d.b = d.b[l:] - return s -} - func (d *decbuf) uvarintStr() string { l := d.uvarint64() if d.e != nil { diff --git a/index.go b/index.go index 3cdaad74d..9beb916f4 100644 --- a/index.go +++ b/index.go @@ -232,13 +232,13 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error { w.toc.labelIndices = w.pos case idxStagePostings: + w.toc.postings = w.pos + + case idxStageDone: w.toc.labelIndicesTable = w.pos if err := w.writeOffsetTable(w.labelIndexes); err != nil { return err } - w.toc.postings = w.pos - - case idxStageDone: w.toc.postingsTable = w.pos if err := w.writeOffsetTable(w.postings); err != nil { return err @@ -404,10 +404,8 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { // writeOffsetTable writes a sequence of readable hash entries. func (w *indexWriter) writeOffsetTable(entries []hashEntry) error { - w.buf1.reset() - w.buf1.putBE32int(len(entries)) - w.buf2.reset() + w.buf2.putBE32int(len(entries)) for _, e := range entries { w.buf2.putUvarint(len(e.keys)) @@ -417,6 +415,7 @@ func (w *indexWriter) writeOffsetTable(entries []hashEntry) error { w.buf2.putUvarint64(e.offset) } + w.buf1.reset() w.buf1.putBE32int(w.buf2.len()) w.buf2.putHash(w.crc32) @@ -563,6 +562,12 @@ type indexReader struct { // Cached hashmaps of section offsets. labels map[string]uint32 postings map[string]uint32 + // Cache of read symbols. Strings that are returned when reading from the + // block are always backed by true strings held in here rather than + // strings that are backed by byte slices from the mmap'd index file. This + // prevents memory faults when applications work with read symbols after + // the block has been unmapped. + symbols map[uint32]string } var ( @@ -579,7 +584,11 @@ func newIndexReader(dir string) (*indexReader, error) { if err != nil { return nil, err } - r := &indexReader{b: f.b, c: f} + r := &indexReader{ + b: f.b, + c: f, + symbols: map[uint32]string{}, + } // Verify magic number. if len(f.b) < 4 { @@ -592,6 +601,9 @@ func newIndexReader(dir string) (*indexReader, error) { if err := r.readTOC(); err != nil { return nil, errors.Wrap(err, "read TOC") } + if err := r.readSymbols(int(r.toc.symbols)); err != nil { + return nil, errors.Wrap(err, "read symbols") + } r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) if err != nil { @@ -623,21 +635,40 @@ func (r *indexReader) decbufAt(off int) decbuf { return decbuf{b: r.b[off:]} } +// readSymbols reads the symbol table fully into memory and allocates proper strings for them. +// Strings backed by the mmap'd memory would cause memory faults if applications keep using them +// after the reader is closed. +func (r *indexReader) readSymbols(off int) error { + if off == 0 { + return nil + } + var ( + d1 = r.decbufAt(int(off)) + d2 = d1.decbuf(d1.be32int()) + origLen = d2.len() + cnt = d2.be32int() + basePos = uint32(off) + 4 + nextPos = basePos + uint32(origLen-d2.len()) + ) + for d2.err() == nil && d2.len() > 0 && cnt > 0 { + s := d2.uvarintStr() + r.symbols[uint32(nextPos)] = s + + nextPos = basePos + uint32(origLen-d2.len()) + cnt-- + } + return d2.err() +} + // readOffsetTable reads an offset table at the given position and returns a map // with the key strings concatenated by the 0xff unicode non-character. func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { - // A table might not have been written at all, in which case the position - // is zeroed out. - if off == 0 { - return nil, nil - } - const sep = "\xff" var ( d1 = r.decbufAt(int(off)) - cnt = d1.be32() d2 = d1.decbuf(d1.be32int()) + cnt = d2.be32() ) res := make(map[string]uint32, 512) @@ -647,7 +678,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.uvarintTempStr()) + keys = append(keys, d2.uvarintStr()) } res[strings.Join(keys, sep)] = uint32(d2.uvarint()) @@ -682,28 +713,20 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { } func (r *indexReader) lookupSymbol(o uint32) (string, error) { - d := r.decbufAt(int(o)) - - s := d.uvarintTempStr() - if d.err() != nil { - return "", errors.Wrapf(d.err(), "read symbol at %d", o) + s, ok := r.symbols[o] + if !ok { + return "", errors.Errorf("unknown symbol offset %d", o) } return s, nil } func (r *indexReader) Symbols() (map[string]struct{}, error) { - d1 := r.decbufAt(int(r.toc.symbols)) - d2 := d1.decbuf(d1.be32int()) + res := make(map[string]struct{}, len(r.symbols)) - count := d2.be32int() - sym := make(map[string]struct{}, count) - - for ; count > 0; count-- { - s := d2.uvarintTempStr() - sym[s] = struct{}{} + for _, s := range r.symbols { + res[s] = struct{}{} } - - return sym, d2.err() + return res, nil } func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { diff --git a/index_test.go b/index_test.go index 12329b560..c97cf380c 100644 --- a/index_test.go +++ b/index_test.go @@ -221,7 +221,7 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(c)) - require.Equal(t, l, series[i]) + require.Equal(t, series[i], l) } require.NoError(t, p.Err())