Persist series without allocating the full set
Change index persistence for series to not be accumulated in memory before being written as one large batch. `Labels` and `ChunkMeta` objects are reused. This cuts down memory spikes during compaction of multiple blocks significantly. As part of the the Index{Reader,Writer} now have an explicit notion of symbols and series must be inserted in order.
This commit is contained in:
parent
1875d05e00
commit
96d7f540d4
|
@ -69,8 +69,6 @@ The file offset to the beginning of a series serves as the series' ID in all sub
|
|||
|
||||
```
|
||||
┌───────────────────────────────────────┐
|
||||
│ #series <4b> │
|
||||
├───────────────────────────────────────┤
|
||||
│ ┌───────────────────────────────────┐ │
|
||||
│ │ series_1 │ │
|
||||
│ ├───────────────────────────────────┤ │
|
||||
|
|
9
block.go
9
block.go
|
@ -251,9 +251,12 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
|||
// Choose only valid postings which have chunks in the time-range.
|
||||
stones := map[uint32]intervals{}
|
||||
|
||||
var lset labels.Labels
|
||||
var chks []*ChunkMeta
|
||||
|
||||
Outer:
|
||||
for p.Next() {
|
||||
lset, chunks, err := ir.Series(p.At())
|
||||
err := ir.Series(p.At(), &lset, &chks)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -264,10 +267,10 @@ Outer:
|
|||
}
|
||||
}
|
||||
|
||||
for _, chk := range chunks {
|
||||
for _, chk := range chks {
|
||||
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
|
||||
// Delete only until the current vlaues and not beyond.
|
||||
tmin, tmax := clampInterval(mint, maxt, chunks[0].MinTime, chunks[len(chunks)-1].MaxTime)
|
||||
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
|
||||
stones[p.At()] = intervals{{tmin, tmax}}
|
||||
continue Outer
|
||||
}
|
||||
|
|
|
@ -13,10 +13,7 @@
|
|||
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
)
|
||||
import "fmt"
|
||||
|
||||
// Encoding is the identifier for a chunk encoding.
|
||||
type Encoding uint8
|
||||
|
@ -50,10 +47,7 @@ type Chunk interface {
|
|||
func FromData(e Encoding, d []byte) (Chunk, error) {
|
||||
switch e {
|
||||
case EncXOR:
|
||||
return &XORChunk{
|
||||
b: &bstream{count: 0, stream: d},
|
||||
num: binary.BigEndian.Uint16(d),
|
||||
}, nil
|
||||
return &XORChunk{b: &bstream{count: 0, stream: d}}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("unknown chunk encoding: %d", e)
|
||||
}
|
||||
|
|
|
@ -52,8 +52,7 @@ import (
|
|||
|
||||
// XORChunk holds XOR encoded sample data.
|
||||
type XORChunk struct {
|
||||
b *bstream
|
||||
num uint16
|
||||
b *bstream
|
||||
}
|
||||
|
||||
// NewXORChunk returns a new chunk with XOR encoding of the given size.
|
||||
|
|
55
compact.go
55
compact.go
|
@ -398,17 +398,31 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
|||
// populateBlock fills the index and chunk writers with new data gathered as the union
|
||||
// of the provided blocks. It returns meta information for the new block.
|
||||
func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
|
||||
var set compactionSet
|
||||
var metas []BlockMeta
|
||||
|
||||
var (
|
||||
set compactionSet
|
||||
metas []BlockMeta
|
||||
allSymbols = make(map[string]struct{}, 1<<16)
|
||||
)
|
||||
for i, b := range blocks {
|
||||
metas = append(metas, b.Meta())
|
||||
|
||||
all, err := b.Index().Postings("", "")
|
||||
symbols, err := b.Index().Symbols()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "read symbols")
|
||||
}
|
||||
for s := range symbols {
|
||||
allSymbols[s] = struct{}{}
|
||||
}
|
||||
|
||||
indexr := b.Index()
|
||||
|
||||
all, err := indexr.Postings("", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := newCompactionSeriesSet(b.Index(), b.Chunks(), b.Tombstones(), all)
|
||||
all = indexr.SortedPostings(all)
|
||||
|
||||
s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all)
|
||||
|
||||
if i == 0 {
|
||||
set = s
|
||||
|
@ -428,6 +442,10 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
|||
meta = compactBlockMetas(metas...)
|
||||
)
|
||||
|
||||
if err := indexw.AddSymbols(allSymbols); err != nil {
|
||||
return nil, errors.Wrap(err, "add symbols")
|
||||
}
|
||||
|
||||
for set.Next() {
|
||||
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
|
||||
|
||||
|
@ -461,7 +479,9 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
|||
return nil, err
|
||||
}
|
||||
|
||||
indexw.AddSeries(i, lset, chks...)
|
||||
if err := indexw.AddSeries(i, lset, chks...); err != nil {
|
||||
return nil, errors.Wrapf(err, "add series")
|
||||
}
|
||||
|
||||
meta.Stats.NumChunks += uint64(len(chks))
|
||||
meta.Stats.NumSeries++
|
||||
|
@ -525,6 +545,7 @@ type compactionSeriesSet struct {
|
|||
index IndexReader
|
||||
chunks ChunkReader
|
||||
tombstones TombstoneReader
|
||||
series SeriesSet
|
||||
|
||||
l labels.Labels
|
||||
c []*ChunkMeta
|
||||
|
@ -545,11 +566,9 @@ func (c *compactionSeriesSet) Next() bool {
|
|||
if !c.p.Next() {
|
||||
return false
|
||||
}
|
||||
|
||||
c.intervals = c.tombstones.Get(c.p.At())
|
||||
|
||||
c.l, c.c, c.err = c.index.Series(c.p.At())
|
||||
if c.err != nil {
|
||||
if c.err = c.index.Series(c.p.At(), &c.l, &c.c); c.err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -629,14 +648,24 @@ func (c *compactionMerger) Next() bool {
|
|||
if !c.aok && !c.bok || c.Err() != nil {
|
||||
return false
|
||||
}
|
||||
// While advancing child iterators the memory used for labels and chunks
|
||||
// may be reused. When picking a series we have to store the result.
|
||||
var lset labels.Labels
|
||||
var chks []*ChunkMeta
|
||||
|
||||
d := c.compare()
|
||||
// Both sets contain the current series. Chain them into a single one.
|
||||
if d > 0 {
|
||||
c.l, c.c, c.intervals = c.b.At()
|
||||
lset, chks, c.intervals = c.b.At()
|
||||
c.l = append(c.l[:0], lset...)
|
||||
c.c = append(c.c[:0], chks...)
|
||||
|
||||
c.bok = c.b.Next()
|
||||
} else if d < 0 {
|
||||
c.l, c.c, c.intervals = c.a.At()
|
||||
lset, chks, c.intervals = c.a.At()
|
||||
c.l = append(c.l[:0], lset...)
|
||||
c.c = append(c.c[:0], chks...)
|
||||
|
||||
c.aok = c.a.Next()
|
||||
} else {
|
||||
l, ca, ra := c.a.At()
|
||||
|
@ -645,8 +674,8 @@ func (c *compactionMerger) Next() bool {
|
|||
ra = ra.add(r)
|
||||
}
|
||||
|
||||
c.l = l
|
||||
c.c = append(ca, cb...)
|
||||
c.l = append(c.l[:0], l...)
|
||||
c.c = append(append(c.c[:0], ca...), cb...)
|
||||
c.intervals = ra
|
||||
|
||||
c.aok = c.a.Next()
|
||||
|
|
87
head.go
87
head.go
|
@ -67,6 +67,7 @@ type HeadBlock struct {
|
|||
// to their chunk descs.
|
||||
hashes map[uint64][]*memSeries
|
||||
|
||||
symbols map[string]struct{}
|
||||
values map[string]stringset // label names to possible values
|
||||
postings *memPostings // postings lists for terms
|
||||
|
||||
|
@ -117,6 +118,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
|
|||
series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil.
|
||||
hashes: map[uint64][]*memSeries{},
|
||||
values: map[string]stringset{},
|
||||
symbols: map[string]struct{}{},
|
||||
postings: &memPostings{m: make(map[term][]uint32)},
|
||||
meta: *meta,
|
||||
tombstones: newEmptyTombstoneReader(),
|
||||
|
@ -332,7 +334,12 @@ func (h *HeadBlock) Snapshot(snapshotDir string) error {
|
|||
func (h *HeadBlock) Dir() string { return h.dir }
|
||||
|
||||
// Index returns an IndexReader against the block.
|
||||
func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} }
|
||||
func (h *HeadBlock) Index() IndexReader {
|
||||
h.mtx.RLock()
|
||||
defer h.mtx.RUnlock()
|
||||
|
||||
return &headIndexReader{HeadBlock: h, maxSeries: uint32(len(h.series) - 1)}
|
||||
}
|
||||
|
||||
// Chunks returns a ChunkReader against the block.
|
||||
func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
||||
|
@ -340,14 +347,10 @@ func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
|||
// Querier returns a new Querier against the block for the range [mint, maxt].
|
||||
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
||||
h.mtx.RLock()
|
||||
defer h.mtx.RUnlock()
|
||||
|
||||
if h.closed {
|
||||
panic(fmt.Sprintf("block %s already closed", h.dir))
|
||||
}
|
||||
|
||||
// Reference on the original slice to use for postings mapping.
|
||||
series := h.series[:]
|
||||
h.mtx.RUnlock()
|
||||
|
||||
return &blockQuerier{
|
||||
mint: mint,
|
||||
|
@ -355,27 +358,6 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier {
|
|||
index: h.Index(),
|
||||
chunks: h.Chunks(),
|
||||
tombstones: h.Tombstones(),
|
||||
|
||||
postingsMapper: func(p Postings) Postings {
|
||||
ep := make([]uint32, 0, 64)
|
||||
|
||||
for p.Next() {
|
||||
// Skip posting entries that include series added after we
|
||||
// instantiated the querier.
|
||||
if int(p.At()) >= len(series) {
|
||||
break
|
||||
}
|
||||
ep = append(ep, p.At())
|
||||
}
|
||||
if err := p.Err(); err != nil {
|
||||
return errPostings{err: errors.Wrap(err, "expand postings")}
|
||||
}
|
||||
|
||||
sort.Slice(ep, func(i, j int) bool {
|
||||
return labels.Compare(series[ep[i]].lset, series[ep[j]].lset) < 0
|
||||
})
|
||||
return newListPostings(ep)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -661,6 +643,12 @@ func (c *safeChunk) Iterator() chunks.Iterator {
|
|||
|
||||
type headIndexReader struct {
|
||||
*HeadBlock
|
||||
// Highest series that existed when the index reader was instantiated.
|
||||
maxSeries uint32
|
||||
}
|
||||
|
||||
func (h *headIndexReader) Symbols() (map[string]struct{}, error) {
|
||||
return h.symbols, nil
|
||||
}
|
||||
|
||||
// LabelValues returns the possible label values
|
||||
|
@ -689,33 +677,59 @@ func (h *headIndexReader) Postings(name, value string) (Postings, error) {
|
|||
return h.postings.get(term{name: name, value: value}), nil
|
||||
}
|
||||
|
||||
// Series returns the series for the given reference.
|
||||
func (h *headIndexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
||||
func (h *headIndexReader) SortedPostings(p Postings) Postings {
|
||||
h.mtx.RLock()
|
||||
defer h.mtx.RUnlock()
|
||||
|
||||
if int(ref) >= len(h.series) {
|
||||
return nil, nil, ErrNotFound
|
||||
ep := make([]uint32, 0, 1024)
|
||||
|
||||
for p.Next() {
|
||||
// Skip posting entries that include series added after we
|
||||
// instantiated the index reader.
|
||||
if p.At() > h.maxSeries {
|
||||
break
|
||||
}
|
||||
ep = append(ep, p.At())
|
||||
}
|
||||
if err := p.Err(); err != nil {
|
||||
return errPostings{err: errors.Wrap(err, "expand postings")}
|
||||
}
|
||||
|
||||
sort.Slice(ep, func(i, j int) bool {
|
||||
return labels.Compare(h.series[ep[i]].lset, h.series[ep[j]].lset) < 0
|
||||
})
|
||||
return newListPostings(ep)
|
||||
}
|
||||
|
||||
// Series returns the series for the given reference.
|
||||
func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error {
|
||||
h.mtx.RLock()
|
||||
defer h.mtx.RUnlock()
|
||||
|
||||
if ref > h.maxSeries {
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
s := h.series[ref]
|
||||
if s == nil {
|
||||
return nil, nil, ErrNotFound
|
||||
return ErrNotFound
|
||||
}
|
||||
metas := make([]*ChunkMeta, 0, len(s.chunks))
|
||||
*lbls = append((*lbls)[:0], s.lset...)
|
||||
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
*chks = (*chks)[:0]
|
||||
|
||||
for i, c := range s.chunks {
|
||||
metas = append(metas, &ChunkMeta{
|
||||
*chks = append(*chks, &ChunkMeta{
|
||||
MinTime: c.minTime,
|
||||
MaxTime: c.maxTime,
|
||||
Ref: (uint64(ref) << 32) | uint64(i),
|
||||
})
|
||||
}
|
||||
|
||||
return s.lset, metas, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *headIndexReader) LabelIndices() ([][]string, error) {
|
||||
|
@ -760,6 +774,9 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries {
|
|||
valset.set(l.Value)
|
||||
|
||||
h.postings.add(s.ref, term{name: l.Name, value: l.Value})
|
||||
|
||||
h.symbols[l.Name] = struct{}{}
|
||||
h.symbols[l.Value] = struct{}{}
|
||||
}
|
||||
|
||||
h.postings.add(s.ref, term{})
|
||||
|
|
241
index.go
241
index.go
|
@ -61,7 +61,9 @@ func (s indexWriterSeriesSlice) Less(i, j int) bool {
|
|||
type indexWriterStage uint8
|
||||
|
||||
const (
|
||||
idxStagePopulate indexWriterStage = iota
|
||||
idxStageNone indexWriterStage = iota
|
||||
idxStageSymbols
|
||||
idxStageSeries
|
||||
idxStageLabelIndex
|
||||
idxStagePostings
|
||||
idxStageDone
|
||||
|
@ -69,8 +71,12 @@ const (
|
|||
|
||||
func (s indexWriterStage) String() string {
|
||||
switch s {
|
||||
case idxStagePopulate:
|
||||
return "populate"
|
||||
case idxStageNone:
|
||||
return "none"
|
||||
case idxStageSymbols:
|
||||
return "symbols"
|
||||
case idxStageSeries:
|
||||
return "series"
|
||||
case idxStageLabelIndex:
|
||||
return "label index"
|
||||
case idxStagePostings:
|
||||
|
@ -82,12 +88,18 @@ func (s indexWriterStage) String() string {
|
|||
}
|
||||
|
||||
// IndexWriter serializes the index for a block of series data.
|
||||
// The methods must generally be called in the order they are specified in.
|
||||
// The methods must be called in the order they are specified in.
|
||||
type IndexWriter interface {
|
||||
// AddSymbols registers all string symbols that are encountered in series
|
||||
// and other indices.
|
||||
AddSymbols(sym map[string]struct{}) error
|
||||
|
||||
// AddSeries populates the index writer with a series and its offsets
|
||||
// of chunks that the index can reference.
|
||||
// The reference number is used to resolve a series against the postings
|
||||
// list iterator. It only has to be available during the write processing.
|
||||
// Implementations may require series to be insert in increasing order by
|
||||
// their labels.
|
||||
// The reference numbers are used to resolve entries in postings lists that
|
||||
// are added later.
|
||||
AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error
|
||||
|
||||
// WriteLabelIndex serializes an index from label names to values.
|
||||
|
@ -118,10 +130,13 @@ type indexWriter struct {
|
|||
buf2 encbuf
|
||||
uint32s []uint32
|
||||
|
||||
series map[uint32]*indexWriterSeries
|
||||
symbols map[string]uint32 // symbol offsets
|
||||
labelIndexes []hashEntry // label index offsets
|
||||
postings []hashEntry // postings lists offsets
|
||||
symbols map[string]uint32 // symbol offsets
|
||||
seriesOffsets map[uint32]uint64 // offsets of series
|
||||
labelIndexes []hashEntry // label index offsets
|
||||
postings []hashEntry // postings lists offsets
|
||||
|
||||
// Hold last series to validate that clients insert new series in order.
|
||||
lastSeries labels.Labels
|
||||
|
||||
crc32 hash.Hash
|
||||
}
|
||||
|
@ -152,7 +167,7 @@ func newIndexWriter(dir string) (*indexWriter, error) {
|
|||
f: f,
|
||||
fbuf: bufio.NewWriterSize(f, 1<<22),
|
||||
pos: 0,
|
||||
stage: idxStagePopulate,
|
||||
stage: idxStageNone,
|
||||
|
||||
// Reusable memory.
|
||||
buf1: encbuf{b: make([]byte, 0, 1<<22)},
|
||||
|
@ -160,9 +175,9 @@ func newIndexWriter(dir string) (*indexWriter, error) {
|
|||
uint32s: make([]uint32, 0, 1<<15),
|
||||
|
||||
// Caches.
|
||||
symbols: make(map[string]uint32, 1<<13),
|
||||
series: make(map[uint32]*indexWriterSeries, 1<<16),
|
||||
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
||||
symbols: make(map[string]uint32, 1<<13),
|
||||
seriesOffsets: make(map[uint32]uint64, 1<<16),
|
||||
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
|
||||
}
|
||||
if err := iw.writeMeta(); err != nil {
|
||||
return nil, err
|
||||
|
@ -207,20 +222,13 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error {
|
|||
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
|
||||
}
|
||||
|
||||
// Complete population stage by writing symbols and series.
|
||||
if w.stage == idxStagePopulate {
|
||||
w.toc.symbols = w.pos
|
||||
if err := w.writeSymbols(); err != nil {
|
||||
return err
|
||||
}
|
||||
w.toc.series = w.pos
|
||||
if err := w.writeSeries(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Mark start of sections in table of contents.
|
||||
switch s {
|
||||
case idxStageSymbols:
|
||||
w.toc.symbols = w.pos
|
||||
case idxStageSeries:
|
||||
w.toc.series = w.pos
|
||||
|
||||
case idxStageLabelIndex:
|
||||
w.toc.labelIndices = w.pos
|
||||
|
||||
|
@ -254,26 +262,65 @@ func (w *indexWriter) writeMeta() error {
|
|||
}
|
||||
|
||||
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error {
|
||||
if _, ok := w.series[ref]; ok {
|
||||
return errors.Errorf("series with reference %d already added", ref)
|
||||
if err := w.ensureStage(idxStageSeries); err != nil {
|
||||
return err
|
||||
}
|
||||
// Populate the symbol table from all label sets we have to reference.
|
||||
for _, l := range lset {
|
||||
w.symbols[l.Name] = 0
|
||||
w.symbols[l.Value] = 0
|
||||
if labels.Compare(lset, w.lastSeries) <= 0 {
|
||||
return errors.Errorf("out-of-order series added with label set %q", lset)
|
||||
}
|
||||
|
||||
w.series[ref] = &indexWriterSeries{
|
||||
labels: lset,
|
||||
chunks: chunks,
|
||||
if _, ok := w.seriesOffsets[ref]; ok {
|
||||
return errors.Errorf("series with reference %d already added", ref)
|
||||
}
|
||||
w.seriesOffsets[ref] = w.pos
|
||||
|
||||
w.buf2.reset()
|
||||
w.buf2.putUvarint(len(lset))
|
||||
|
||||
for _, l := range lset {
|
||||
offset, ok := w.symbols[l.Name]
|
||||
if !ok {
|
||||
return errors.Errorf("symbol entry for %q does not exist", l.Name)
|
||||
}
|
||||
w.buf2.putUvarint32(offset)
|
||||
|
||||
offset, ok = w.symbols[l.Value]
|
||||
if !ok {
|
||||
return errors.Errorf("symbol entry for %q does not exist", l.Value)
|
||||
}
|
||||
w.buf2.putUvarint32(offset)
|
||||
}
|
||||
|
||||
w.buf2.putUvarint(len(chunks))
|
||||
|
||||
for _, c := range chunks {
|
||||
w.buf2.putVarint64(c.MinTime)
|
||||
w.buf2.putVarint64(c.MaxTime)
|
||||
w.buf2.putUvarint64(c.Ref)
|
||||
}
|
||||
|
||||
w.buf1.reset()
|
||||
w.buf1.putUvarint(w.buf2.len())
|
||||
|
||||
w.buf2.putHash(w.crc32)
|
||||
|
||||
if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
|
||||
return errors.Wrap(err, "write series data")
|
||||
}
|
||||
|
||||
w.lastSeries = append(w.lastSeries[:0], lset...)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *indexWriter) writeSymbols() error {
|
||||
func (w *indexWriter) AddSymbols(sym map[string]struct{}) error {
|
||||
if err := w.ensureStage(idxStageSymbols); err != nil {
|
||||
return err
|
||||
}
|
||||
// Generate sorted list of strings we will store as reference table.
|
||||
symbols := make([]string, 0, len(w.symbols))
|
||||
for s := range w.symbols {
|
||||
symbols := make([]string, 0, len(sym))
|
||||
|
||||
for s := range sym {
|
||||
symbols = append(symbols, s)
|
||||
}
|
||||
sort.Strings(symbols)
|
||||
|
@ -285,12 +332,14 @@ func (w *indexWriter) writeSymbols() error {
|
|||
|
||||
w.buf2.putBE32int(len(symbols))
|
||||
|
||||
w.symbols = make(map[string]uint32, len(symbols))
|
||||
|
||||
for _, s := range symbols {
|
||||
w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
|
||||
|
||||
// NOTE: len(s) gives the number of runes, not the number of bytes.
|
||||
// Therefore the read-back length for strings with unicode characters will
|
||||
// be off when not using putCstr.
|
||||
// be off when not using putUvarintStr.
|
||||
w.buf2.putUvarintStr(s)
|
||||
}
|
||||
|
||||
|
@ -301,55 +350,6 @@ func (w *indexWriter) writeSymbols() error {
|
|||
return errors.Wrap(err, "write symbols")
|
||||
}
|
||||
|
||||
func (w *indexWriter) writeSeries() error {
|
||||
// Series must be stored sorted along their labels.
|
||||
series := make(indexWriterSeriesSlice, 0, len(w.series))
|
||||
|
||||
for _, s := range w.series {
|
||||
series = append(series, s)
|
||||
}
|
||||
sort.Sort(series)
|
||||
|
||||
// Header holds number of series.
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32int(len(series))
|
||||
|
||||
if err := w.write(w.buf1.get()); err != nil {
|
||||
return errors.Wrap(err, "write series count")
|
||||
}
|
||||
|
||||
for _, s := range series {
|
||||
s.offset = uint32(w.pos)
|
||||
|
||||
w.buf2.reset()
|
||||
w.buf2.putUvarint(len(s.labels))
|
||||
|
||||
for _, l := range s.labels {
|
||||
w.buf2.putUvarint32(w.symbols[l.Name])
|
||||
w.buf2.putUvarint32(w.symbols[l.Value])
|
||||
}
|
||||
|
||||
w.buf2.putUvarint(len(s.chunks))
|
||||
|
||||
for _, c := range s.chunks {
|
||||
w.buf2.putVarint64(c.MinTime)
|
||||
w.buf2.putVarint64(c.MaxTime)
|
||||
w.buf2.putUvarint64(c.Ref)
|
||||
}
|
||||
|
||||
w.buf1.reset()
|
||||
w.buf1.putUvarint(w.buf2.len())
|
||||
|
||||
w.buf2.putHash(w.crc32)
|
||||
|
||||
if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
|
||||
return errors.Wrap(err, "write series data")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||
if len(values)%len(names) != 0 {
|
||||
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
|
||||
|
@ -379,7 +379,11 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
|||
w.buf2.putBE32int(valt.Len())
|
||||
|
||||
for _, v := range valt.s {
|
||||
w.buf2.putBE32(w.symbols[v])
|
||||
offset, ok := w.symbols[v]
|
||||
if !ok {
|
||||
return errors.Errorf("symbol entry for %q does not exist", v)
|
||||
}
|
||||
w.buf2.putBE32(offset)
|
||||
}
|
||||
|
||||
w.buf1.reset()
|
||||
|
@ -450,11 +454,11 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
|||
refs := w.uint32s[:0]
|
||||
|
||||
for it.Next() {
|
||||
s, ok := w.series[it.At()]
|
||||
offset, ok := w.seriesOffsets[it.At()]
|
||||
if !ok {
|
||||
return errors.Errorf("series for reference %d not found", it.At())
|
||||
return errors.Errorf("%p series for reference %d not found", w, it.At())
|
||||
}
|
||||
refs = append(refs, s.offset)
|
||||
refs = append(refs, uint32(offset)) // XXX(fabxc): get uint64 vs uint32 sorted out.
|
||||
}
|
||||
if err := it.Err(); err != nil {
|
||||
return err
|
||||
|
@ -503,6 +507,10 @@ func (w *indexWriter) Close() error {
|
|||
|
||||
// IndexReader provides reading access of serialized index data.
|
||||
type IndexReader interface {
|
||||
// Symbols returns a set of string symbols that may occur in series' labels
|
||||
// and indices.
|
||||
Symbols() (map[string]struct{}, error)
|
||||
|
||||
// LabelValues returns the possible label values
|
||||
LabelValues(names ...string) (StringTuples, error)
|
||||
|
||||
|
@ -510,8 +518,13 @@ type IndexReader interface {
|
|||
// The Postings here contain the offsets to the series inside the index.
|
||||
Postings(name, value string) (Postings, error)
|
||||
|
||||
// Series returns the series for the given reference.
|
||||
Series(ref uint32) (labels.Labels, []*ChunkMeta, error)
|
||||
// SortedPostings returns a postings list that is reordered to be sorted
|
||||
// by the label set of the underlying series.
|
||||
SortedPostings(Postings) Postings
|
||||
|
||||
// Series populates the given labels and chunk metas for the series identified
|
||||
// by the reference.
|
||||
Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error
|
||||
|
||||
// LabelIndices returns the label pairs for which indices exist.
|
||||
LabelIndices() ([][]string, error)
|
||||
|
@ -664,6 +677,21 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
|||
return s, nil
|
||||
}
|
||||
|
||||
func (r *indexReader) Symbols() (map[string]struct{}, error) {
|
||||
d1 := r.decbufAt(int(r.toc.symbols))
|
||||
d2 := d1.decbuf(d1.be32int())
|
||||
|
||||
count := d2.be32int()
|
||||
sym := make(map[string]struct{}, count)
|
||||
|
||||
for ; count > 0; count-- {
|
||||
s := d2.uvarintStr()
|
||||
sym[s] = struct{}{}
|
||||
}
|
||||
|
||||
return sym, d2.err()
|
||||
}
|
||||
|
||||
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||
const sep = "\xff"
|
||||
|
||||
|
@ -712,36 +740,37 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
||||
func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]*ChunkMeta) error {
|
||||
d1 := r.decbufAt(int(ref))
|
||||
d2 := d1.decbuf(int(d1.uvarint()))
|
||||
|
||||
*lbls = (*lbls)[:0]
|
||||
*chks = (*chks)[:0]
|
||||
|
||||
k := int(d2.uvarint())
|
||||
lbls := make(labels.Labels, 0, k)
|
||||
|
||||
for i := 0; i < k; i++ {
|
||||
lno := uint32(d2.uvarint())
|
||||
lvo := uint32(d2.uvarint())
|
||||
|
||||
if d2.err() != nil {
|
||||
return nil, nil, errors.Wrap(d2.err(), "read series label offsets")
|
||||
return errors.Wrap(d2.err(), "read series label offsets")
|
||||
}
|
||||
|
||||
ln, err := r.lookupSymbol(lno)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "lookup label name")
|
||||
return errors.Wrap(err, "lookup label name")
|
||||
}
|
||||
lv, err := r.lookupSymbol(lvo)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "lookup label value")
|
||||
return errors.Wrap(err, "lookup label value")
|
||||
}
|
||||
|
||||
lbls = append(lbls, labels.Label{Name: ln, Value: lv})
|
||||
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
|
||||
}
|
||||
|
||||
// Read the chunks meta data.
|
||||
k = int(d2.uvarint())
|
||||
chunks := make([]*ChunkMeta, 0, k)
|
||||
|
||||
for i := 0; i < k; i++ {
|
||||
mint := d2.varint64()
|
||||
|
@ -749,10 +778,10 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
|||
off := d2.uvarint64()
|
||||
|
||||
if d2.err() != nil {
|
||||
return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i)
|
||||
return errors.Wrapf(d2.err(), "read meta for chunk %d", i)
|
||||
}
|
||||
|
||||
chunks = append(chunks, &ChunkMeta{
|
||||
*chks = append(*chks, &ChunkMeta{
|
||||
Ref: off,
|
||||
MinTime: mint,
|
||||
MaxTime: maxt,
|
||||
|
@ -761,7 +790,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
|||
|
||||
// TODO(fabxc): verify CRC32.
|
||||
|
||||
return lbls, chunks, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *indexReader) Postings(name, value string) (Postings, error) {
|
||||
|
@ -787,6 +816,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
|
|||
return newBigEndianPostings(d2.get()), nil
|
||||
}
|
||||
|
||||
func (r *indexReader) SortedPostings(p Postings) Postings {
|
||||
return p
|
||||
}
|
||||
|
||||
type stringTuples struct {
|
||||
l int // tuple length
|
||||
s []string // flattened tuple entries
|
||||
|
|
130
index_test.go
130
index_test.go
|
@ -35,21 +35,31 @@ type series struct {
|
|||
type mockIndex struct {
|
||||
series map[uint32]series
|
||||
labelIndex map[string][]string
|
||||
postings map[labels.Label]Postings
|
||||
postings *memPostings
|
||||
symbols map[string]struct{}
|
||||
}
|
||||
|
||||
func newMockIndex() mockIndex {
|
||||
return mockIndex{
|
||||
series: make(map[uint32]series),
|
||||
labelIndex: make(map[string][]string),
|
||||
postings: make(map[labels.Label]Postings),
|
||||
postings: &memPostings{m: make(map[term][]uint32)},
|
||||
symbols: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m mockIndex) Symbols() (map[string]struct{}, error) {
|
||||
return m.symbols, nil
|
||||
}
|
||||
|
||||
func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...*ChunkMeta) error {
|
||||
if _, ok := m.series[ref]; ok {
|
||||
return errors.Errorf("series with reference %d already added", ref)
|
||||
}
|
||||
for _, lbl := range l {
|
||||
m.symbols[lbl.Name] = struct{}{}
|
||||
m.symbols[lbl.Value] = struct{}{}
|
||||
}
|
||||
|
||||
s := series{l: l}
|
||||
// Actual chunk data is not stored in the index.
|
||||
|
@ -75,41 +85,16 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error {
|
|||
}
|
||||
|
||||
func (m mockIndex) WritePostings(name, value string, it Postings) error {
|
||||
lbl := labels.Label{
|
||||
Name: name,
|
||||
Value: value,
|
||||
if _, ok := m.postings.m[term{name, value}]; ok {
|
||||
return errors.Errorf("postings for %s=%q already added", name, value)
|
||||
}
|
||||
|
||||
type refdSeries struct {
|
||||
ref uint32
|
||||
series series
|
||||
}
|
||||
|
||||
// Re-Order so that the list is ordered by labels of the series.
|
||||
// Internally that is how the series are laid out.
|
||||
refs := make([]refdSeries, 0)
|
||||
for it.Next() {
|
||||
s, ok := m.series[it.At()]
|
||||
if !ok {
|
||||
return errors.Errorf("series for reference %d not found", it.At())
|
||||
}
|
||||
refs = append(refs, refdSeries{it.At(), s})
|
||||
}
|
||||
if err := it.Err(); err != nil {
|
||||
ep, err := expandPostings(it)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.postings.m[term{name, value}] = ep
|
||||
|
||||
sort.Slice(refs, func(i, j int) bool {
|
||||
return labels.Compare(refs[i].series.l, refs[j].series.l) < 0
|
||||
})
|
||||
|
||||
postings := make([]uint32, 0, len(refs))
|
||||
for _, r := range refs {
|
||||
postings = append(postings, r.ref)
|
||||
}
|
||||
|
||||
m.postings[lbl] = newListPostings(postings)
|
||||
return nil
|
||||
return it.Err()
|
||||
}
|
||||
|
||||
func (m mockIndex) Close() error {
|
||||
|
@ -126,26 +111,30 @@ func (m mockIndex) LabelValues(names ...string) (StringTuples, error) {
|
|||
}
|
||||
|
||||
func (m mockIndex) Postings(name, value string) (Postings, error) {
|
||||
lbl := labels.Label{
|
||||
Name: name,
|
||||
Value: value,
|
||||
}
|
||||
|
||||
p, ok := m.postings[lbl]
|
||||
if !ok {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
return p, nil
|
||||
return m.postings.get(term{name, value}), nil
|
||||
}
|
||||
|
||||
func (m mockIndex) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
||||
s, ok := m.series[ref]
|
||||
if !ok {
|
||||
return nil, nil, ErrNotFound
|
||||
func (m mockIndex) SortedPostings(p Postings) Postings {
|
||||
ep, err := expandPostings(p)
|
||||
if err != nil {
|
||||
return errPostings{err: errors.Wrap(err, "expand postings")}
|
||||
}
|
||||
|
||||
return s.l, s.chunks, nil
|
||||
sort.Slice(ep, func(i, j int) bool {
|
||||
return labels.Compare(m.series[ep[i]].l, m.series[ep[j]].l) < 0
|
||||
})
|
||||
return newListPostings(ep)
|
||||
}
|
||||
|
||||
func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]*ChunkMeta) error {
|
||||
s, ok := m.series[ref]
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
}
|
||||
*lset = append((*lset)[:0], s.l...)
|
||||
*chks = append((*chks)[:0], s.chunks...)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockIndex) LabelIndices() ([][]string, error) {
|
||||
|
@ -197,11 +186,21 @@ func TestIndexRW_Postings(t *testing.T) {
|
|||
labels.FromStrings("a", "1", "b", "4"),
|
||||
}
|
||||
|
||||
err = iw.AddSymbols(map[string]struct{}{
|
||||
"a": struct{}{},
|
||||
"b": struct{}{},
|
||||
"1": struct{}{},
|
||||
"2": struct{}{},
|
||||
"3": struct{}{},
|
||||
"4": struct{}{},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Postings lists are only written if a series with the respective
|
||||
// reference was added before.
|
||||
require.NoError(t, iw.AddSeries(1, series[0]))
|
||||
require.NoError(t, iw.AddSeries(3, series[2]))
|
||||
require.NoError(t, iw.AddSeries(2, series[1]))
|
||||
require.NoError(t, iw.AddSeries(3, series[2]))
|
||||
require.NoError(t, iw.AddSeries(4, series[3]))
|
||||
|
||||
err = iw.WritePostings("a", "1", newListPostings([]uint32{1, 2, 3, 4}))
|
||||
|
@ -215,8 +214,11 @@ func TestIndexRW_Postings(t *testing.T) {
|
|||
p, err := ir.Postings("a", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
var l labels.Labels
|
||||
var c []*ChunkMeta
|
||||
|
||||
for i := 0; p.Next(); i++ {
|
||||
l, c, err := ir.Series(p.At())
|
||||
err := ir.Series(p.At(), &l, &c)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(c))
|
||||
|
@ -235,6 +237,17 @@ func TestPersistence_index_e2e(t *testing.T) {
|
|||
lbls, err := readPrometheusLabels("testdata/20k.series", 20000)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Sort labels as the index writer expects series in sorted order.
|
||||
sort.Sort(labels.Slice(lbls))
|
||||
|
||||
symbols := map[string]struct{}{}
|
||||
for _, lset := range lbls {
|
||||
for _, l := range lset {
|
||||
symbols[l.Name] = struct{}{}
|
||||
symbols[l.Value] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
var input indexWriterSeriesSlice
|
||||
|
||||
// Generate ChunkMetas for every label set.
|
||||
|
@ -258,6 +271,8 @@ func TestPersistence_index_e2e(t *testing.T) {
|
|||
iw, err := newIndexWriter(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, iw.AddSymbols(symbols))
|
||||
|
||||
// Population procedure as done by compaction.
|
||||
var (
|
||||
postings = &memPostings{m: make(map[term][]uint32, 512)}
|
||||
|
@ -311,21 +326,24 @@ func TestPersistence_index_e2e(t *testing.T) {
|
|||
ir, err := newIndexReader(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
for p := range mi.postings {
|
||||
gotp, err := ir.Postings(p.Name, p.Value)
|
||||
for p := range mi.postings.m {
|
||||
gotp, err := ir.Postings(p.name, p.value)
|
||||
require.NoError(t, err)
|
||||
|
||||
expp, err := mi.Postings(p.Name, p.Value)
|
||||
expp, err := mi.Postings(p.name, p.value)
|
||||
|
||||
var lset, explset labels.Labels
|
||||
var chks, expchks []*ChunkMeta
|
||||
|
||||
for gotp.Next() {
|
||||
require.True(t, expp.Next())
|
||||
|
||||
ref := gotp.At()
|
||||
|
||||
lset, chks, err := ir.Series(ref)
|
||||
err := ir.Series(ref, &lset, &chks)
|
||||
require.NoError(t, err)
|
||||
|
||||
explset, expchks, err := mi.Series(expp.At())
|
||||
err = mi.Series(expp.At(), &explset, &expchks)
|
||||
require.Equal(t, explset, lset)
|
||||
require.Equal(t, expchks, chks)
|
||||
}
|
||||
|
|
17
querier.go
17
querier.go
|
@ -15,7 +15,7 @@ package tsdb
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
|
@ -134,8 +134,6 @@ type blockQuerier struct {
|
|||
chunks ChunkReader
|
||||
tombstones TombstoneReader
|
||||
|
||||
postingsMapper func(Postings) Postings
|
||||
|
||||
mint, maxt int64
|
||||
}
|
||||
|
||||
|
@ -144,10 +142,6 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
|||
|
||||
p, absent := pr.Select(ms...)
|
||||
|
||||
if q.postingsMapper != nil {
|
||||
p = q.postingsMapper(p)
|
||||
}
|
||||
|
||||
return &blockSeriesSet{
|
||||
set: &populatedChunkSeries{
|
||||
set: &baseChunkSeries{
|
||||
|
@ -218,7 +212,7 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
|
|||
|
||||
p := Intersect(its...)
|
||||
|
||||
return p, absent
|
||||
return r.index.SortedPostings(p), absent
|
||||
}
|
||||
|
||||
// tuplesByPrefix uses binary search to find prefix matches within ts.
|
||||
|
@ -434,11 +428,14 @@ func (s *baseChunkSeries) At() (labels.Labels, []*ChunkMeta, intervals) {
|
|||
func (s *baseChunkSeries) Err() error { return s.err }
|
||||
|
||||
func (s *baseChunkSeries) Next() bool {
|
||||
var (
|
||||
lset labels.Labels
|
||||
chunks []*ChunkMeta
|
||||
)
|
||||
Outer:
|
||||
for s.p.Next() {
|
||||
ref := s.p.At()
|
||||
lset, chunks, err := s.index.Series(ref)
|
||||
if err != nil {
|
||||
if err := s.index.Series(ref, &lset, &chunks); err != nil {
|
||||
s.err = err
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -432,18 +432,18 @@ func TestBlockQuerier(t *testing.T) {
|
|||
maxt: 6,
|
||||
ms: []labels.Matcher{labels.NewPrefixMatcher("p", "abc")},
|
||||
exp: newListSeriesSet([]Series{
|
||||
newSeries(map[string]string{
|
||||
"p": "abcd",
|
||||
"x": "xyz",
|
||||
},
|
||||
[]sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}},
|
||||
),
|
||||
newSeries(map[string]string{
|
||||
"a": "ab",
|
||||
"p": "abce",
|
||||
},
|
||||
[]sample{{2, 2}, {3, 3}, {5, 3}, {6, 6}},
|
||||
),
|
||||
newSeries(map[string]string{
|
||||
"p": "abcd",
|
||||
"x": "xyz",
|
||||
},
|
||||
[]sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}},
|
||||
),
|
||||
}),
|
||||
},
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue