diff --git a/tsdb/compact.go b/tsdb/compact.go index 7b4ff9c1b..c5bd2ed2a 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -82,6 +82,7 @@ type LeveledCompactor struct { ctx context.Context maxBlockChunkSegmentSize int64 mergeFunc storage.VerticalChunkSeriesMergeFunc + postingsEncoder index.PostingsEncoder } type CompactorMetrics struct { @@ -144,12 +145,30 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics { return m } -// NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { - return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc) +type LeveledCompactorOptions struct { + // PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction. + // If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more. + PE index.PostingsEncoder + // MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used. + MaxBlockChunkSegmentSize int64 + // MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used. + MergeFunc storage.VerticalChunkSeriesMergeFunc } func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { + return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ + MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, + MergeFunc: mergeFunc, + }) +} + +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { + return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ + MergeFunc: mergeFunc, + }) +} + +func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, fmt.Errorf("at least one range must be provided") } @@ -159,9 +178,18 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register if l == nil { l = log.NewNopLogger() } - if mergeFunc == nil { + var mergeFunc storage.VerticalChunkSeriesMergeFunc + if opts.MergeFunc == nil { mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) } + var maxBlockChunkSegmentSize int64 + if opts.MaxBlockChunkSegmentSize == 0 { + maxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize + } + var pe index.PostingsEncoder + if opts.PE == nil { + pe = index.EncodePostingsRaw + } return &LeveledCompactor{ ranges: ranges, chunkPool: pool, @@ -170,6 +198,7 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register ctx: ctx, maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, mergeFunc: mergeFunc, + postingsEncoder: pe, }, nil } @@ -599,7 +628,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl } } - indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) + indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder) if err != nil { return fmt.Errorf("open index writer: %w", err) } diff --git a/tsdb/db.go b/tsdb/db.go index b2cc37a19..bdeb3faee 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -442,8 +442,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { nil, db.logger, ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), - chunkenc.NewPool(), - nil, + chunkenc.NewPool(), nil, ) if err != nil { return fmt.Errorf("create leveled compactor: %w", err) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 2b025a352..c2ca581f7 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -110,6 +110,8 @@ type symbolCacheEntry struct { lastValue string } +type PostingsEncoder func(*encoding.Encbuf, []uint32) error + // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { @@ -148,6 +150,8 @@ type Writer struct { crc32 hash.Hash Version int + + postingsEncoder PostingsEncoder } // TOC represents index Table Of Content that states where each section of index starts. @@ -186,7 +190,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. -func NewWriter(ctx context.Context, fn string) (*Writer, error) { +// It uses the given encoder to encode each postings list. +func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -229,9 +234,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - symbolCache: make(map[string]symbolCacheEntry, 1<<8), - labelNames: make(map[string]uint64, 1<<8), - crc32: newCRC32(), + symbolCache: make(map[string]symbolCacheEntry, 1<<8), + labelNames: make(map[string]uint64, 1<<8), + crc32: newCRC32(), + postingsEncoder: encoder, } if err := iw.writeMeta(); err != nil { return nil, err @@ -239,6 +245,12 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { return iw, nil } +// NewWriter creates a new index writer using the default encoder. See +// NewWriterWithEncoder. +func NewWriter(ctx context.Context, fn string) (*Writer, error) { + return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw) +} + func (w *Writer) write(bufs ...[]byte) error { return w.f.Write(bufs...) } @@ -941,6 +953,20 @@ func (w *Writer) writePostingsToTmpFiles() error { return nil } +// EncodePostingsRaw uses the "basic" postings list encoding format with no compression: +// <BE uint32 len X><BE uint32 0><BE uint32 1>...<BE uint32 X-1>. +func EncodePostingsRaw(e *encoding.Encbuf, offs []uint32) error { + e.PutBE32int(len(offs)) + + for _, off := range offs { + if off > (1<<32)-1 { + return fmt.Errorf("series offset %d exceeds 4 bytes", off) + } + e.PutBE32(off) + } + return nil +} + func (w *Writer) writePosting(name, value string, offs []uint32) error { // Align beginning to 4 bytes for more efficient postings list scans. if err := w.fP.AddPadding(4); err != nil { @@ -959,13 +985,8 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.cntPO++ w.buf1.Reset() - w.buf1.PutBE32int(len(offs)) - - for _, off := range offs { - if off > (1<<32)-1 { - return fmt.Errorf("series offset %d exceeds 4 bytes", off) - } - w.buf1.PutBE32(off) + if err := w.postingsEncoder(&w.buf1, offs); err != nil { + return err } w.buf2.Reset()