mirror of
https://github.com/prometheus/prometheus
synced 2025-01-14 02:43:35 +00:00
tsdb/{index,compact}: allow using custom postings encoding format (#13242)
* tsdb/{index,compact}: allow using custom postings encoding format We would like to experiment with a different postings encoding format in Thanos so in this change I am proposing adding another argument to `NewWriter` which would allow users to change the format if needed. Also, wire the leveled compactor so that it would be possible to change the format there too. Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com> * tsdb/compact: use a struct for leveled compactor options As discussed on Slack, let's use a struct for the options in leveled compactor. Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com> * tsdb: make changes after Bryan's review - Make changes less intrusive - Turn the postings encoder type into a function - Add NewWriterWithEncoder() Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com> --------- Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
This commit is contained in:
parent
775d955919
commit
61b4080a14
@ -82,6 +82,7 @@ type LeveledCompactor struct {
|
||||
ctx context.Context
|
||||
maxBlockChunkSegmentSize int64
|
||||
mergeFunc storage.VerticalChunkSeriesMergeFunc
|
||||
postingsEncoder index.PostingsEncoder
|
||||
}
|
||||
|
||||
type CompactorMetrics struct {
|
||||
@ -144,12 +145,30 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics {
|
||||
return m
|
||||
}
|
||||
|
||||
// NewLeveledCompactor returns a LeveledCompactor.
|
||||
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
|
||||
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc)
|
||||
type LeveledCompactorOptions struct {
|
||||
// PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction.
|
||||
// If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more.
|
||||
PE index.PostingsEncoder
|
||||
// MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used.
|
||||
MaxBlockChunkSegmentSize int64
|
||||
// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
|
||||
MergeFunc storage.VerticalChunkSeriesMergeFunc
|
||||
}
|
||||
|
||||
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
|
||||
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
|
||||
MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
|
||||
MergeFunc: mergeFunc,
|
||||
})
|
||||
}
|
||||
|
||||
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
|
||||
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
|
||||
MergeFunc: mergeFunc,
|
||||
})
|
||||
}
|
||||
|
||||
func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) {
|
||||
if len(ranges) == 0 {
|
||||
return nil, fmt.Errorf("at least one range must be provided")
|
||||
}
|
||||
@ -159,9 +178,18 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
|
||||
if l == nil {
|
||||
l = log.NewNopLogger()
|
||||
}
|
||||
if mergeFunc == nil {
|
||||
var mergeFunc storage.VerticalChunkSeriesMergeFunc
|
||||
if opts.MergeFunc == nil {
|
||||
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
|
||||
}
|
||||
var maxBlockChunkSegmentSize int64
|
||||
if opts.MaxBlockChunkSegmentSize == 0 {
|
||||
maxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize
|
||||
}
|
||||
var pe index.PostingsEncoder
|
||||
if opts.PE == nil {
|
||||
pe = index.EncodePostingsRaw
|
||||
}
|
||||
return &LeveledCompactor{
|
||||
ranges: ranges,
|
||||
chunkPool: pool,
|
||||
@ -170,6 +198,7 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
|
||||
ctx: ctx,
|
||||
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
|
||||
mergeFunc: mergeFunc,
|
||||
postingsEncoder: pe,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -599,7 +628,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
|
||||
}
|
||||
}
|
||||
|
||||
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
|
||||
indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open index writer: %w", err)
|
||||
}
|
||||
|
@ -442,8 +442,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
|
||||
nil,
|
||||
db.logger,
|
||||
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
|
||||
chunkenc.NewPool(),
|
||||
nil,
|
||||
chunkenc.NewPool(), nil,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create leveled compactor: %w", err)
|
||||
|
@ -110,6 +110,8 @@ type symbolCacheEntry struct {
|
||||
lastValue string
|
||||
}
|
||||
|
||||
type PostingsEncoder func(*encoding.Encbuf, []uint32) error
|
||||
|
||||
// Writer implements the IndexWriter interface for the standard
|
||||
// serialization format.
|
||||
type Writer struct {
|
||||
@ -148,6 +150,8 @@ type Writer struct {
|
||||
crc32 hash.Hash
|
||||
|
||||
Version int
|
||||
|
||||
postingsEncoder PostingsEncoder
|
||||
}
|
||||
|
||||
// TOC represents index Table Of Content that states where each section of index starts.
|
||||
@ -186,7 +190,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
|
||||
}
|
||||
|
||||
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
|
||||
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
||||
// It uses the given encoder to encode each postings list.
|
||||
func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) {
|
||||
dir := filepath.Dir(fn)
|
||||
|
||||
df, err := fileutil.OpenDir(dir)
|
||||
@ -229,9 +234,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
||||
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
||||
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
||||
|
||||
symbolCache: make(map[string]symbolCacheEntry, 1<<8),
|
||||
labelNames: make(map[string]uint64, 1<<8),
|
||||
crc32: newCRC32(),
|
||||
symbolCache: make(map[string]symbolCacheEntry, 1<<8),
|
||||
labelNames: make(map[string]uint64, 1<<8),
|
||||
crc32: newCRC32(),
|
||||
postingsEncoder: encoder,
|
||||
}
|
||||
if err := iw.writeMeta(); err != nil {
|
||||
return nil, err
|
||||
@ -239,6 +245,12 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
||||
return iw, nil
|
||||
}
|
||||
|
||||
// NewWriter creates a new index writer using the default encoder. See
|
||||
// NewWriterWithEncoder.
|
||||
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
||||
return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw)
|
||||
}
|
||||
|
||||
func (w *Writer) write(bufs ...[]byte) error {
|
||||
return w.f.Write(bufs...)
|
||||
}
|
||||
@ -941,6 +953,20 @@ func (w *Writer) writePostingsToTmpFiles() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// EncodePostingsRaw uses the "basic" postings list encoding format with no compression:
|
||||
// <BE uint32 len X><BE uint32 0><BE uint32 1>...<BE uint32 X-1>.
|
||||
func EncodePostingsRaw(e *encoding.Encbuf, offs []uint32) error {
|
||||
e.PutBE32int(len(offs))
|
||||
|
||||
for _, off := range offs {
|
||||
if off > (1<<32)-1 {
|
||||
return fmt.Errorf("series offset %d exceeds 4 bytes", off)
|
||||
}
|
||||
e.PutBE32(off)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) writePosting(name, value string, offs []uint32) error {
|
||||
// Align beginning to 4 bytes for more efficient postings list scans.
|
||||
if err := w.fP.AddPadding(4); err != nil {
|
||||
@ -959,13 +985,8 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
|
||||
w.cntPO++
|
||||
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32int(len(offs))
|
||||
|
||||
for _, off := range offs {
|
||||
if off > (1<<32)-1 {
|
||||
return fmt.Errorf("series offset %d exceeds 4 bytes", off)
|
||||
}
|
||||
w.buf1.PutBE32(off)
|
||||
if err := w.postingsEncoder(&w.buf1, offs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.buf2.Reset()
|
||||
|
Loading…
Reference in New Issue
Block a user