From 3a48adc54f1d77dd3956cc0f12c33b8f3454c6af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 12 Jan 2024 09:56:44 +0200 Subject: [PATCH] tsdb: add enable overlapping compaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This functionality is needed in downstream projects because they have a separate component that does compaction. Upstreaming https://github.com/grafana/mimir-prometheus/blob/7c8e9a2a76fc729e9078889782928b2fdfe240e9/tsdb/compact.go#L323-L325. Signed-off-by: Giedrius Statkevičius --- tsdb/compact.go | 48 +++++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index c5bd2ed2a..b4c194de9 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -75,14 +75,15 @@ type Compactor interface { // LeveledCompactor implements the Compactor interface. type LeveledCompactor struct { - metrics *CompactorMetrics - logger log.Logger - ranges []int64 - chunkPool chunkenc.Pool - ctx context.Context - maxBlockChunkSegmentSize int64 - mergeFunc storage.VerticalChunkSeriesMergeFunc - postingsEncoder index.PostingsEncoder + metrics *CompactorMetrics + logger log.Logger + ranges []int64 + chunkPool chunkenc.Pool + ctx context.Context + maxBlockChunkSegmentSize int64 + mergeFunc storage.VerticalChunkSeriesMergeFunc + postingsEncoder index.PostingsEncoder + enableOverlappingCompaction bool } type CompactorMetrics struct { @@ -153,18 +154,23 @@ type LeveledCompactorOptions struct { MaxBlockChunkSegmentSize int64 // MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used. MergeFunc storage.VerticalChunkSeriesMergeFunc + // EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled. + // It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction. + EnableOverlappingCompaction bool } func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ - MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, - MergeFunc: mergeFunc, + MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, + MergeFunc: mergeFunc, + EnableOverlappingCompaction: true, }) } func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ - MergeFunc: mergeFunc, + MergeFunc: mergeFunc, + EnableOverlappingCompaction: true, }) } @@ -191,14 +197,15 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer pe = index.EncodePostingsRaw } return &LeveledCompactor{ - ranges: ranges, - chunkPool: pool, - logger: l, - metrics: newCompactorMetrics(r), - ctx: ctx, - maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, - mergeFunc: mergeFunc, - postingsEncoder: pe, + ranges: ranges, + chunkPool: pool, + logger: l, + metrics: newCompactorMetrics(r), + ctx: ctx, + maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, + mergeFunc: mergeFunc, + postingsEncoder: pe, + enableOverlappingCompaction: opts.EnableOverlappingCompaction, }, nil } @@ -317,6 +324,9 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { // selectOverlappingDirs returns all dirs with overlapping time ranges. // It expects sorted input by mint and returns the overlapping dirs in the same order as received. func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string { + if !c.enableOverlappingCompaction { + return nil + } if len(ds) < 2 { return nil }