mirror of
https://github.com/prometheus/prometheus
synced 2024-12-24 23:42:32 +00:00
Stream symbols during compaction. (#6468)
Rather than buffer up symbols in RAM, do it one by one during compaction. Then use the reader's symbol handling for symbol lookups during the rest of the index write. There is some slowdown in compaction, due to having to look through a file rather than a hash lookup. This is noise to the overall cost of compacting series with thousands of samples though. benchmark old ns/op new ns/op delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 539917175 675341565 +25.08% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 2441815993 2477453524 +1.46% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3978543559 3922909687 -1.40% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 8430219716 8586610007 +1.86% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 1786424591 1909552782 +6.89% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 5328998202 6020839950 +12.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 10085059958 11085278690 +9.92% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 25497010155 27018079806 +5.97% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 2427391406 2817217987 +16.06% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 2592965497 2538805050 -2.09% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 2437388343 2668012858 +9.46% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 2317095324 2787423966 +20.30% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 2600239857 2096973860 -19.35% benchmark old allocs new allocs delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 500851 470794 -6.00% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 821527 791451 -3.66% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 1141562 1111508 -2.63% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 2141576 2111504 -1.40% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 871466 841424 -3.45% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 1941428 1911415 -1.55% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 3071573 3041510 -0.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 6771648 6741509 -0.45% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 731493 824888 +12.77% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 793918 887311 +11.76% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 811842 905204 +11.50% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 832244 925081 +11.16% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 921553 1019162 +10.59% benchmark old bytes new bytes delta BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 40532648 35698276 -11.93% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 60340216 53409568 -11.49% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 81087336 72065552 -11.13% BenchmarkCompaction/type=normal,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 142485576 120878544 -15.16% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=101-4 208661368 203831136 -2.31% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=1001-4 347345904 340484696 -1.98% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=2001-4 585185856 576244648 -1.53% BenchmarkCompaction/type=vertical,blocks=4,series=10000,samplesPerSeriesPerBlock=5001-4 1357641792 1358966528 +0.10% BenchmarkCompactionFromHead/labelnames=1,labelvalues=100000-4 126486664 119666744 -5.39% BenchmarkCompactionFromHead/labelnames=10,labelvalues=10000-4 122323192 115117224 -5.89% BenchmarkCompactionFromHead/labelnames=100,labelvalues=1000-4 126404504 119469864 -5.49% BenchmarkCompactionFromHead/labelnames=1000,labelvalues=100-4 119047832 112230408 -5.73% BenchmarkCompactionFromHead/labelnames=10000,labelvalues=10-4 136576016 116634800 -14.60% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
767fa704b6
commit
d782387f81
@ -39,8 +39,8 @@ import (
|
||||
// The methods must be called in the order they are specified in.
|
||||
type IndexWriter interface {
|
||||
// AddSymbols registers all string symbols that are encountered in series
|
||||
// and other indices.
|
||||
AddSymbols(sym map[string]struct{}) error
|
||||
// and other indices. Symbols must be added in sorted order.
|
||||
AddSymbol(sym string) error
|
||||
|
||||
// AddSeries populates the index writer with a series and its offsets
|
||||
// of chunks that the index can reference.
|
||||
@ -61,9 +61,10 @@ type IndexWriter interface {
|
||||
|
||||
// IndexReader provides reading access of serialized index data.
|
||||
type IndexReader interface {
|
||||
// Symbols returns a set of string symbols that may occur in series' labels
|
||||
// and indices.
|
||||
Symbols() (map[string]struct{}, error)
|
||||
// Symbols return an iterator over sorted string symbols that may occur in
|
||||
// series' labels and indices. It is not safe to use the returned strings
|
||||
// beyond the lifetime of the index reader.
|
||||
Symbols() index.StringIter
|
||||
|
||||
// LabelValues returns sorted possible label values.
|
||||
LabelValues(names ...string) (index.StringTuples, error)
|
||||
@ -432,9 +433,8 @@ type blockIndexReader struct {
|
||||
b *Block
|
||||
}
|
||||
|
||||
func (r blockIndexReader) Symbols() (map[string]struct{}, error) {
|
||||
s, err := r.ir.Symbols()
|
||||
return s, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
|
||||
func (r blockIndexReader) Symbols() index.StringIter {
|
||||
return r.ir.Symbols()
|
||||
}
|
||||
|
||||
func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, error) {
|
||||
|
@ -650,7 +650,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||
|
||||
var (
|
||||
set ChunkSeriesSet
|
||||
allSymbols = make(map[string]struct{}, 1<<16)
|
||||
symbols index.StringIter
|
||||
closers = []io.Closer{}
|
||||
overlapping bool
|
||||
)
|
||||
@ -700,14 +700,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||
}
|
||||
closers = append(closers, tombsr)
|
||||
|
||||
symbols, err := indexr.Symbols()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "read symbols")
|
||||
}
|
||||
for s := range symbols {
|
||||
allSymbols[s] = struct{}{}
|
||||
}
|
||||
|
||||
k, v := index.AllPostingsKey()
|
||||
all, err := indexr.Postings(k, v)
|
||||
if err != nil {
|
||||
@ -716,15 +708,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||
all = indexr.SortedPostings(all)
|
||||
|
||||
s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
|
||||
syms := indexr.Symbols()
|
||||
|
||||
if i == 0 {
|
||||
set = s
|
||||
symbols = syms
|
||||
continue
|
||||
}
|
||||
set, err = newCompactionMerger(set, s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
symbols = newMergedStringIter(symbols, syms)
|
||||
}
|
||||
|
||||
var (
|
||||
@ -732,8 +727,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||
ref = uint64(0)
|
||||
)
|
||||
|
||||
if err := indexw.AddSymbols(allSymbols); err != nil {
|
||||
return errors.Wrap(err, "add symbols")
|
||||
for symbols.Next() {
|
||||
if err := indexw.AddSymbol(symbols.At()); err != nil {
|
||||
return errors.Wrap(err, "add symbol")
|
||||
}
|
||||
}
|
||||
if symbols.Err() != nil {
|
||||
return errors.Wrap(symbols.Err(), "next symbol")
|
||||
}
|
||||
|
||||
delIter := &deletedIterator{}
|
||||
@ -1026,3 +1026,47 @@ func (c *compactionMerger) Err() error {
|
||||
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
|
||||
return c.l, c.c, c.intervals
|
||||
}
|
||||
|
||||
func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter {
|
||||
return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()}
|
||||
}
|
||||
|
||||
type mergedStringIter struct {
|
||||
a index.StringIter
|
||||
b index.StringIter
|
||||
aok, bok bool
|
||||
cur string
|
||||
}
|
||||
|
||||
func (m *mergedStringIter) Next() bool {
|
||||
if (!m.aok && !m.bok) || (m.Err() != nil) {
|
||||
return false
|
||||
}
|
||||
|
||||
if !m.aok {
|
||||
m.cur = m.b.At()
|
||||
m.bok = m.b.Next()
|
||||
} else if !m.bok {
|
||||
m.cur = m.a.At()
|
||||
m.aok = m.a.Next()
|
||||
} else if m.b.At() > m.a.At() {
|
||||
m.cur = m.a.At()
|
||||
m.aok = m.a.Next()
|
||||
} else if m.a.At() > m.b.At() {
|
||||
m.cur = m.b.At()
|
||||
m.bok = m.b.Next()
|
||||
} else { // Equal.
|
||||
m.cur = m.b.At()
|
||||
m.aok = m.a.Next()
|
||||
m.bok = m.b.Next()
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
func (m mergedStringIter) At() string { return m.cur }
|
||||
func (m mergedStringIter) Err() error {
|
||||
if m.a.Err() != nil {
|
||||
return m.a.Err()
|
||||
}
|
||||
return m.b.Err()
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc
|
||||
### Label Index
|
||||
|
||||
A label index section indexes the existing (combined) values for one or more label names.
|
||||
The `#names` field determines the number of indexed label names, followed by the total number of entries in the `#entries` field. The body holds #entries / #names tuples of symbol table references, each tuple being of #names length. The value tuples are sorted in lexicographically increasing order.
|
||||
The `#names` field determines the number of indexed label names, followed by the total number of entries in the `#entries` field. The body holds #entries / #names tuples of symbol table references, each tuple being of #names length. The value tuples are sorted in lexicographically increasing order. This is no longer used.
|
||||
|
||||
```
|
||||
┌───────────────┬────────────────┬────────────────┐
|
||||
@ -181,7 +181,7 @@ The sequence of postings sections is finalized by a [postings offset table](#pos
|
||||
|
||||
A label offset table stores a sequence of label offset entries.
|
||||
Every label offset entry holds the label name and the offset to its values in the label index section.
|
||||
They are used to track label index sections. They are read into memory when an index file is loaded.
|
||||
They are used to track label index sections. This is no longer used.
|
||||
|
||||
```
|
||||
┌─────────────────────┬──────────────────────┐
|
||||
|
13
tsdb/head.go
13
tsdb/head.go
@ -1339,16 +1339,17 @@ func (h *headIndexReader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *headIndexReader) Symbols() (map[string]struct{}, error) {
|
||||
func (h *headIndexReader) Symbols() index.StringIter {
|
||||
h.head.symMtx.RLock()
|
||||
defer h.head.symMtx.RUnlock()
|
||||
|
||||
res := make(map[string]struct{}, len(h.head.symbols))
|
||||
res := make([]string, 0, len(h.head.symbols))
|
||||
|
||||
for s := range h.head.symbols {
|
||||
res[s] = struct{}{}
|
||||
res = append(res, s)
|
||||
}
|
||||
return res, nil
|
||||
h.head.symMtx.RUnlock()
|
||||
|
||||
sort.Strings(res)
|
||||
return index.NewStringListIter(res)
|
||||
}
|
||||
|
||||
// LabelValues returns the possible label values
|
||||
|
@ -123,11 +123,14 @@ type Writer struct {
|
||||
buf1 encoding.Encbuf
|
||||
buf2 encoding.Encbuf
|
||||
|
||||
symbols map[string]uint32 // symbol offsets
|
||||
reverseSymbols map[uint32]string
|
||||
labelIndexes []labelIndexHashEntry // label index offsets
|
||||
postings []postingsHashEntry // postings lists offsets
|
||||
labelNames map[string]uint64 // label names, and their usage
|
||||
numSymbols int
|
||||
symbols *Symbols
|
||||
symbolFile *fileutil.MmapFile
|
||||
lastSymbol string
|
||||
|
||||
labelIndexes []labelIndexHashEntry // label index offsets
|
||||
postings []postingsHashEntry // postings lists offsets
|
||||
labelNames map[string]uint64 // label names, and their usage
|
||||
|
||||
// Hold last series to validate that clients insert new series in order.
|
||||
lastSeries labels.Labels
|
||||
@ -275,7 +278,13 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
||||
switch s {
|
||||
case idxStageSymbols:
|
||||
w.toc.Symbols = w.pos
|
||||
if err := w.startSymbols(); err != nil {
|
||||
return err
|
||||
}
|
||||
case idxStageSeries:
|
||||
if err := w.finishSymbols(); err != nil {
|
||||
return err
|
||||
}
|
||||
w.toc.Series = w.pos
|
||||
|
||||
case idxStageLabelIndex:
|
||||
@ -339,16 +348,16 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
||||
|
||||
for _, l := range lset {
|
||||
// here we have an index for the symbol file if v2, otherwise it's an offset
|
||||
index, ok := w.symbols[l.Name]
|
||||
if !ok {
|
||||
return errors.Errorf("symbol entry for %q does not exist", l.Name)
|
||||
index, err := w.symbols.ReverseLookup(l.Name)
|
||||
if err != nil {
|
||||
return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err)
|
||||
}
|
||||
w.labelNames[l.Name]++
|
||||
w.buf2.PutUvarint32(index)
|
||||
|
||||
index, ok = w.symbols[l.Value]
|
||||
if !ok {
|
||||
return errors.Errorf("symbol entry for %q does not exist", l.Value)
|
||||
index, err = w.symbols.ReverseLookup(l.Value)
|
||||
if err != nil {
|
||||
return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err)
|
||||
}
|
||||
w.buf2.PutUvarint32(index)
|
||||
}
|
||||
@ -388,58 +397,66 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) AddSymbols(sym map[string]struct{}) error {
|
||||
func (w *Writer) startSymbols() error {
|
||||
// We are at w.toc.Symbols.
|
||||
// Leave 4 bytes of space for the length, and another 4 for the number of symbols
|
||||
// which will both be calculated later.
|
||||
return w.write([]byte("alenblen"))
|
||||
}
|
||||
|
||||
func (w *Writer) AddSymbol(sym string) error {
|
||||
if err := w.ensureStage(idxStageSymbols); err != nil {
|
||||
return err
|
||||
}
|
||||
// Generate sorted list of strings we will store as reference table.
|
||||
symbols := make([]string, 0, len(sym))
|
||||
|
||||
for s := range sym {
|
||||
symbols = append(symbols, s)
|
||||
if w.numSymbols != 0 && sym <= w.lastSymbol {
|
||||
return errors.Errorf("symbol %q out-of-order", sym)
|
||||
}
|
||||
sort.Strings(symbols)
|
||||
|
||||
startPos := w.pos
|
||||
// Leave 4 bytes of space for the length, which will be calculated later.
|
||||
if err := w.write([]byte("alen")); err != nil {
|
||||
return err
|
||||
}
|
||||
w.crc32.Reset()
|
||||
|
||||
w.lastSymbol = sym
|
||||
w.numSymbols++
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32int(len(symbols))
|
||||
w.buf1.WriteToHash(w.crc32)
|
||||
if err := w.write(w.buf1.Get()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.symbols = make(map[string]uint32, len(symbols))
|
||||
w.reverseSymbols = make(map[uint32]string, len(symbols))
|
||||
|
||||
for index, s := range symbols {
|
||||
w.symbols[s] = uint32(index)
|
||||
w.reverseSymbols[uint32(index)] = s
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutUvarintStr(s)
|
||||
w.buf1.WriteToHash(w.crc32)
|
||||
if err := w.write(w.buf1.Get()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Write out the length.
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32int(int(w.pos - startPos - 4))
|
||||
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutHashSum(w.crc32)
|
||||
w.buf1.PutUvarintStr(sym)
|
||||
return w.write(w.buf1.Get())
|
||||
}
|
||||
|
||||
func (w *Writer) finishSymbols() error {
|
||||
// Write out the length and symbol count.
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32int(int(w.pos - w.toc.Symbols - 4))
|
||||
w.buf1.PutBE32int(int(w.numSymbols))
|
||||
if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hashPos := w.pos
|
||||
// Leave space for the hash. We can only calculate it
|
||||
// now that the number of symbols is known, so mmap and do it from there.
|
||||
if err := w.write([]byte("hash")); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.fbuf.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var err error
|
||||
w.symbolFile, err = fileutil.OpenMmapFile(w.f.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hash := crc32.Checksum(w.symbolFile.Bytes()[w.toc.Symbols+4:hashPos], castagnoliTable)
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32(hash)
|
||||
if err := w.writeAt(w.buf1.Get(), hashPos); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load in the symbol table efficiently for the rest of the index writing.
|
||||
w.symbols, err = NewSymbols(realByteSlice(w.symbolFile.Bytes()), FormatV2, int(w.toc.Symbols))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "read symbols")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
||||
if len(values)%len(names) != 0 {
|
||||
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
|
||||
@ -481,12 +498,12 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
||||
|
||||
// here we have an index for the symbol file if v2, otherwise it's an offset
|
||||
for _, v := range valt.entries {
|
||||
index, ok := w.symbols[v]
|
||||
if !ok {
|
||||
return errors.Errorf("symbol entry for %q does not exist", v)
|
||||
sid, err := w.symbols.ReverseLookup(v)
|
||||
if err != nil {
|
||||
return errors.Errorf("symbol entry for %q does not exist: %v", v, err)
|
||||
}
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32(index)
|
||||
w.buf1.PutBE32(sid)
|
||||
w.buf1.WriteToHash(w.crc32)
|
||||
if err := w.write(w.buf1.Get()); err != nil {
|
||||
return err
|
||||
@ -585,7 +602,7 @@ func (w *Writer) writePostingsOffsetTable() error {
|
||||
return w.write(w.buf1.Get())
|
||||
}
|
||||
|
||||
const indexTOCLen = 6*8 + 4
|
||||
const indexTOCLen = 6*8 + crc32.Size
|
||||
|
||||
func (w *Writer) writeTOC() error {
|
||||
w.buf1.Reset()
|
||||
@ -629,8 +646,8 @@ func (w *Writer) writePostings() error {
|
||||
return errors.Errorf("series not 16-byte aligned at %d", startPos)
|
||||
}
|
||||
offsets = append(offsets, uint32(startPos/16))
|
||||
// Skip to next series. The 4 is for the CRC32.
|
||||
d.Skip(d.Uvarint() + 4)
|
||||
// Skip to next series.
|
||||
d.Skip(d.Uvarint() + crc32.Size)
|
||||
if err := d.Err(); err != nil {
|
||||
return nil
|
||||
}
|
||||
@ -654,9 +671,13 @@ func (w *Writer) writePostings() error {
|
||||
names = names[1:]
|
||||
}
|
||||
|
||||
nameSymbols := map[uint32]struct{}{}
|
||||
nameSymbols := map[uint32]string{}
|
||||
for _, name := range batchNames {
|
||||
nameSymbols[w.symbols[name]] = struct{}{}
|
||||
sid, err := w.symbols.ReverseLookup(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nameSymbols[sid] = name
|
||||
}
|
||||
// Label name -> label value -> positions.
|
||||
postings := map[uint32]map[uint32][]uint32{}
|
||||
@ -679,14 +700,11 @@ func (w *Writer) writePostings() error {
|
||||
if _, ok := postings[lno]; !ok {
|
||||
postings[lno] = map[uint32][]uint32{}
|
||||
}
|
||||
if _, ok := postings[lno][lvo]; !ok {
|
||||
postings[lno][lvo] = []uint32{}
|
||||
}
|
||||
postings[lno][lvo] = append(postings[lno][lvo], uint32(startPos/16))
|
||||
}
|
||||
}
|
||||
// Skip to next series. The 4 is for the CRC32.
|
||||
d.Skip(l - (startLen - d.Len()) + 4)
|
||||
// Skip to next series.
|
||||
d.Skip(l - (startLen - d.Len()) + crc32.Size)
|
||||
if err := d.Err(); err != nil {
|
||||
return nil
|
||||
}
|
||||
@ -694,15 +712,23 @@ func (w *Writer) writePostings() error {
|
||||
|
||||
for _, name := range batchNames {
|
||||
// Write out postings for this label name.
|
||||
values := make([]uint32, 0, len(postings[w.symbols[name]]))
|
||||
for v := range postings[w.symbols[name]] {
|
||||
sid, err := w.symbols.ReverseLookup(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
values := make([]uint32, 0, len(postings[sid]))
|
||||
for v := range postings[sid] {
|
||||
values = append(values, v)
|
||||
|
||||
}
|
||||
// Symbol numbers are in order, so the strings will also be in order.
|
||||
sort.Sort(uint32slice(values))
|
||||
for _, v := range values {
|
||||
if err := w.writePosting(name, w.reverseSymbols[v], postings[w.symbols[name]][v]); err != nil {
|
||||
value, err := w.symbols.Lookup(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.writePosting(name, value, postings[sid][v]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -765,6 +791,11 @@ func (w *Writer) Close() error {
|
||||
if err := w.ensureStage(idxStageDone); err != nil {
|
||||
return err
|
||||
}
|
||||
if w.symbolFile != nil {
|
||||
if err := w.symbolFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := w.fbuf.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -782,6 +813,18 @@ type StringTuples interface {
|
||||
At(i int) ([]string, error)
|
||||
}
|
||||
|
||||
// StringIter iterates over a sorted list of strings.
|
||||
type StringIter interface {
|
||||
// Next advances the iterator and returns true if another value was found.
|
||||
Next() bool
|
||||
|
||||
// At returns the value at the current iterator position.
|
||||
At() string
|
||||
|
||||
// Err returns the last error of the iterator.
|
||||
Err() error
|
||||
}
|
||||
|
||||
type Reader struct {
|
||||
b ByteSlice
|
||||
toc *TOC
|
||||
@ -1038,6 +1081,9 @@ func (s Symbols) Lookup(o uint32) (string, error) {
|
||||
}
|
||||
|
||||
func (s Symbols) ReverseLookup(sym string) (uint32, error) {
|
||||
if len(s.offsets) == 0 {
|
||||
return 0, errors.Errorf("unknown symbol %q - no symbols", sym)
|
||||
}
|
||||
i := sort.Search(len(s.offsets), func(i int) bool {
|
||||
// Any decoding errors here will be lost, however
|
||||
// we already read through all of this at startup.
|
||||
@ -1077,24 +1123,43 @@ func (s Symbols) ReverseLookup(sym string) (uint32, error) {
|
||||
return uint32(s.bs.Len() - lastLen), nil
|
||||
}
|
||||
|
||||
func (s Symbols) All() (map[string]struct{}, error) {
|
||||
d := encoding.NewDecbufAt(s.bs, s.off, castagnoliTable)
|
||||
cnt := d.Be32int()
|
||||
res := make(map[string]struct{}, cnt)
|
||||
for d.Err() == nil && cnt > 0 {
|
||||
res[d.UvarintStr()] = struct{}{}
|
||||
cnt--
|
||||
}
|
||||
if d.Err() != nil {
|
||||
return nil, d.Err()
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s Symbols) Size() int {
|
||||
return len(s.offsets) * 8
|
||||
}
|
||||
|
||||
func (s Symbols) Iter() StringIter {
|
||||
d := encoding.NewDecbufAt(s.bs, s.off, castagnoliTable)
|
||||
cnt := d.Be32int()
|
||||
return &symbolsIter{
|
||||
d: d,
|
||||
cnt: cnt,
|
||||
}
|
||||
}
|
||||
|
||||
// symbolsIter implements StringIter.
|
||||
type symbolsIter struct {
|
||||
d encoding.Decbuf
|
||||
cnt int
|
||||
cur string
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *symbolsIter) Next() bool {
|
||||
if s.cnt == 0 || s.err != nil {
|
||||
return false
|
||||
}
|
||||
s.cur = yoloString(s.d.UvarintBytes())
|
||||
s.cnt--
|
||||
if s.d.Err() != nil {
|
||||
s.err = s.d.Err()
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s symbolsIter) At() string { return s.cur }
|
||||
func (s symbolsIter) Err() error { return s.err }
|
||||
|
||||
// ReadOffsetTable reads an offset table and at the given position calls f for each
|
||||
// found entry. If f returns an error it stops decoding and returns the received error.
|
||||
func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64, int) error) error {
|
||||
@ -1137,9 +1202,9 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) {
|
||||
return r.symbols.Lookup(o)
|
||||
}
|
||||
|
||||
// Symbols returns a set of symbols that exist within the index.
|
||||
func (r *Reader) Symbols() (map[string]struct{}, error) {
|
||||
return r.symbols.All()
|
||||
// Symbols returns an iterator over the symbols that exist within the index.
|
||||
func (r *Reader) Symbols() StringIter {
|
||||
return r.symbols.Iter()
|
||||
}
|
||||
|
||||
// SymbolTableSize returns the symbol table size in bytes.
|
||||
@ -1359,6 +1424,28 @@ func (t *stringTuples) Less(i, j int) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// NewStringListIterator returns a StringIter for the given sorted list of strings.
|
||||
func NewStringListIter(s []string) StringIter {
|
||||
return &stringListIter{l: s}
|
||||
}
|
||||
|
||||
// symbolsIter implements StringIter.
|
||||
type stringListIter struct {
|
||||
l []string
|
||||
cur string
|
||||
}
|
||||
|
||||
func (s *stringListIter) Next() bool {
|
||||
if len(s.l) == 0 {
|
||||
return false
|
||||
}
|
||||
s.cur = s.l[0]
|
||||
s.l = s.l[1:]
|
||||
return true
|
||||
}
|
||||
func (s stringListIter) At() string { return s.cur }
|
||||
func (s stringListIter) Err() error { return nil }
|
||||
|
||||
// Decoder provides decoding methods for the v1 and v2 index file format.
|
||||
//
|
||||
// It currently does not contain decoding methods for all entry types but can be extended
|
||||
|
@ -186,15 +186,12 @@ func TestIndexRW_Postings(t *testing.T) {
|
||||
labels.FromStrings("a", "1", "b", "4"),
|
||||
}
|
||||
|
||||
err = iw.AddSymbols(map[string]struct{}{
|
||||
"a": {},
|
||||
"b": {},
|
||||
"1": {},
|
||||
"2": {},
|
||||
"3": {},
|
||||
"4": {},
|
||||
})
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, iw.AddSymbol("1"))
|
||||
testutil.Ok(t, iw.AddSymbol("2"))
|
||||
testutil.Ok(t, iw.AddSymbol("3"))
|
||||
testutil.Ok(t, iw.AddSymbol("4"))
|
||||
testutil.Ok(t, iw.AddSymbol("a"))
|
||||
testutil.Ok(t, iw.AddSymbol("b"))
|
||||
|
||||
// Postings lists are only written if a series with the respective
|
||||
// reference was added before.
|
||||
@ -280,7 +277,14 @@ func TestPostingsMany(t *testing.T) {
|
||||
symbols["i"] = struct{}{}
|
||||
symbols["foo"] = struct{}{}
|
||||
symbols["bar"] = struct{}{}
|
||||
testutil.Ok(t, iw.AddSymbols(symbols))
|
||||
syms := []string{}
|
||||
for s := range symbols {
|
||||
syms = append(syms, s)
|
||||
}
|
||||
sort.Strings(syms)
|
||||
for _, s := range syms {
|
||||
testutil.Ok(t, iw.AddSymbol(s))
|
||||
}
|
||||
|
||||
for i, s := range series {
|
||||
testutil.Ok(t, iw.AddSeries(uint64(i), s))
|
||||
@ -390,7 +394,14 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||
iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename))
|
||||
testutil.Ok(t, err)
|
||||
|
||||
testutil.Ok(t, iw.AddSymbols(symbols))
|
||||
syms := []string{}
|
||||
for s := range symbols {
|
||||
syms = append(syms, s)
|
||||
}
|
||||
sort.Strings(syms)
|
||||
for _, s := range syms {
|
||||
testutil.Ok(t, iw.AddSymbol(s))
|
||||
}
|
||||
|
||||
// Population procedure as done by compaction.
|
||||
var (
|
||||
@ -479,14 +490,18 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
gotSymbols, err := ir.Symbols()
|
||||
testutil.Ok(t, err)
|
||||
|
||||
testutil.Equals(t, len(mi.symbols), len(gotSymbols))
|
||||
for s := range mi.symbols {
|
||||
_, ok := gotSymbols[s]
|
||||
testutil.Assert(t, ok, "")
|
||||
gotSymbols := []string{}
|
||||
it := ir.Symbols()
|
||||
for it.Next() {
|
||||
gotSymbols = append(gotSymbols, it.At())
|
||||
}
|
||||
testutil.Ok(t, it.Err())
|
||||
expSymbols := []string{}
|
||||
for s := range mi.symbols {
|
||||
expSymbols = append(expSymbols, s)
|
||||
}
|
||||
sort.Strings(expSymbols)
|
||||
testutil.Equals(t, expSymbols, gotSymbols)
|
||||
|
||||
testutil.Ok(t, ir.Close())
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ type mockIndexWriter struct {
|
||||
series []seriesSamples
|
||||
}
|
||||
|
||||
func (mockIndexWriter) AddSymbols(sym map[string]struct{}) error { return nil }
|
||||
func (mockIndexWriter) AddSymbol(sym string) error { return nil }
|
||||
func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error {
|
||||
i := -1
|
||||
for j, s := range m.series {
|
||||
|
@ -1316,8 +1316,13 @@ func newMockIndex() mockIndex {
|
||||
return ix
|
||||
}
|
||||
|
||||
func (m mockIndex) Symbols() (map[string]struct{}, error) {
|
||||
return m.symbols, nil
|
||||
func (m mockIndex) Symbols() index.StringIter {
|
||||
l := []string{}
|
||||
for s := range m.symbols {
|
||||
l = append(l, s)
|
||||
}
|
||||
sort.Strings(l)
|
||||
return index.NewStringListIter(l)
|
||||
}
|
||||
|
||||
func (m *mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error {
|
||||
|
Loading…
Reference in New Issue
Block a user