From 8a08f452b6a66698baf45f32beac2f1f54d52f20 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 4 Jun 2024 16:11:36 -0700 Subject: [PATCH] tsdb: Allow passing a custom compactor to override the default one (#14113) * expose hook in tsdb to allow customizing compactor Signed-off-by: Ben Ye * address comment Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- tsdb/db.go | 19 ++++++++++++++----- tsdb/db_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index bca3c9948..5651b403e 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -189,8 +189,13 @@ type Options struct { // EnableSharding enables query sharding support in TSDB. EnableSharding bool + + // NewCompactorFunc is a function that returns a TSDB compactor. + NewCompactorFunc NewCompactorFunc } +type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) + type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} // DB handles reads and writes of time series falling into @@ -851,13 +856,17 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } ctx, cancel := context.WithCancel(context.Background()) - db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ - MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, - EnableOverlappingCompaction: opts.EnableOverlappingCompaction, - }) + if opts.NewCompactorFunc != nil { + db.compactor, err = opts.NewCompactorFunc(ctx, r, l, rngs, db.chunkPool, opts) + } else { + db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ + MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, + EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + }) + } if err != nil { cancel() - return nil, fmt.Errorf("create leveled compactor: %w", err) + return nil, fmt.Errorf("create compactor: %w", err) } db.compactCancel = cancel diff --git a/tsdb/db_test.go b/tsdb/db_test.go index f0d672fad..69c9f60e3 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -7125,3 +7125,35 @@ func TestAbortBlockCompactions(t *testing.T) { require.True(t, db.head.compactable(), "head should be compactable") require.Equal(t, 4, compactions, "expected 4 compactions to be completed") } + +func TestNewCompactorFunc(t *testing.T) { + opts := DefaultOptions() + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + opts.NewCompactorFunc = func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) { + return &mockCompactorFn{ + planFn: func() ([]string, error) { + return []string{block1.String(), block2.String()}, nil + }, + compactFn: func() (ulid.ULID, error) { + return block1, nil + }, + writeFn: func() (ulid.ULID, error) { + return block2, nil + }, + }, nil + } + db := openTestDB(t, opts, nil) + defer func() { + require.NoError(t, db.Close()) + }() + plans, err := db.compactor.Plan("") + require.NoError(t, err) + require.Equal(t, []string{block1.String(), block2.String()}, plans) + ulid, err := db.compactor.Compact("", nil, nil) + require.NoError(t, err) + require.Equal(t, block1, ulid) + ulid, err = db.compactor.Write("", nil, 0, 1, nil) + require.NoError(t, err) + require.Equal(t, block2, ulid) +}