Stop compactions if there's a block to write (#13754)
* Stop compactions if there's a block to write db.Compact() checks if there's a block to write with HEAD chunks before calling db.compactBlocks(). This is to ensure that if we need to write a block then it happens ASAP, otherwise memory usage might keep growing. But what can also happen is that we don't need to write any block, we start db.compactBlocks(), compaction takes hours, and in the meantime HEAD needs to write out chunks to a block. This can be especially problematic if, for example, you run Thanos sidecar that's uploading block, which requires that compactions are disabled. Then you disable Thanos sidecar and re-enable compactions. When db.compactBlocks() is finally called it might have a huge number of blocks to compact, which might take a very long time, during which HEAD cannot write out chunks to a new block. In such case memory usage will keep growing until either: - compactions are finally finished and HEAD can write a block - we run out of memory and Prometheus gets OOM-killed This change adds a check for pending HEAD block writes inside db.compactBlocks(), so that we bail out early if there are still compactions to run, but we also need to write a new block. Also add a test for compactBlocks. --------- Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com> Signed-off-by: Lukasz Mierzwa <lukasz@cloudflare.com>
This commit is contained in:
parent
fc567a1df8
commit
277f04f0c4
|
@ -1365,6 +1365,14 @@ func (db *DB) compactHead(head *RangeHead) error {
|
|||
func (db *DB) compactBlocks() (err error) {
|
||||
// Check for compactions of multiple blocks.
|
||||
for {
|
||||
// If we have a lot of blocks to compact the whole process might take
|
||||
// long enough that we end up with a HEAD block that needs to be written.
|
||||
// Check if that's the case and stop compactions early.
|
||||
if db.head.compactable() {
|
||||
level.Warn(db.logger).Log("msg", "aborting block compactions to persit the head block")
|
||||
return nil
|
||||
}
|
||||
|
||||
plan, err := db.compactor.Plan(db.dir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("plan compaction: %w", err)
|
||||
|
|
|
@ -6986,3 +6986,63 @@ func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) {
|
|||
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat)),
|
||||
"number of ooo appended samples mismatch")
|
||||
}
|
||||
|
||||
type mockCompactorFn struct {
|
||||
planFn func() ([]string, error)
|
||||
compactFn func() (ulid.ULID, error)
|
||||
writeFn func() (ulid.ULID, error)
|
||||
}
|
||||
|
||||
func (c *mockCompactorFn) Plan(_ string) ([]string, error) {
|
||||
return c.planFn()
|
||||
}
|
||||
|
||||
func (c *mockCompactorFn) Compact(_ string, _ []string, _ []*Block) (ulid.ULID, error) {
|
||||
return c.compactFn()
|
||||
}
|
||||
|
||||
func (c *mockCompactorFn) Write(_ string, _ BlockReader, _, _ int64, _ *BlockMeta) (ulid.ULID, error) {
|
||||
return c.writeFn()
|
||||
}
|
||||
|
||||
// Regression test for https://github.com/prometheus/prometheus/pull/13754
|
||||
func TestAbortBlockCompactions(t *testing.T) {
|
||||
// Create a test DB
|
||||
db := openTestDB(t, nil, nil)
|
||||
defer func() {
|
||||
require.NoError(t, db.Close())
|
||||
}()
|
||||
// It should NOT be compactible at the beginning of the test
|
||||
require.False(t, db.head.compactable(), "head should NOT be compactable")
|
||||
|
||||
// Track the number of compactions run inside db.compactBlocks()
|
||||
var compactions int
|
||||
|
||||
// Use a mock compactor with custom Plan() implementation
|
||||
db.compactor = &mockCompactorFn{
|
||||
planFn: func() ([]string, error) {
|
||||
// On every Plan() run increment compactions. After 4 compactions
|
||||
// update HEAD to make it compactible to force an exit from db.compactBlocks() loop.
|
||||
compactions++
|
||||
if compactions > 3 {
|
||||
chunkRange := db.head.chunkRange.Load()
|
||||
db.head.minTime.Store(0)
|
||||
db.head.maxTime.Store(chunkRange * 2)
|
||||
require.True(t, db.head.compactable(), "head should be compactable")
|
||||
}
|
||||
// Our custom Plan() will always return something to compact.
|
||||
return []string{"1", "2", "3"}, nil
|
||||
},
|
||||
compactFn: func() (ulid.ULID, error) {
|
||||
return ulid.ULID{}, nil
|
||||
},
|
||||
writeFn: func() (ulid.ULID, error) {
|
||||
return ulid.ULID{}, nil
|
||||
},
|
||||
}
|
||||
|
||||
err := db.Compact(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.True(t, db.head.compactable(), "head should be compactable")
|
||||
require.Equal(t, 4, compactions, "expected 4 compactions to be completed")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue