mirror of
https://github.com/prometheus/prometheus
synced 2025-01-12 09:40:00 +00:00
Fix race condition in ChunkDiskMapper.Truncate() (#12500)
* Fix race condition in ChunkDiskMapper.Truncate() Signed-off-by: Marco Pracucci <marco@pracucci.com> * Added unit test Signed-off-by: Marco Pracucci <marco@pracucci.com> * Update tsdb/chunks/head_chunks.go Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Marco Pracucci <marco@pracucci.com> --------- Signed-off-by: Marco Pracucci <marco@pracucci.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
71d149a79f
commit
031d22df9e
@ -948,12 +948,22 @@ func (cdm *ChunkDiskMapper) Truncate(fileNo uint32) error {
|
|||||||
if len(chkFileIndices) == len(removedFiles) {
|
if len(chkFileIndices) == len(removedFiles) {
|
||||||
// All files were deleted. Reset the current sequence.
|
// All files were deleted. Reset the current sequence.
|
||||||
cdm.evtlPosMtx.Lock()
|
cdm.evtlPosMtx.Lock()
|
||||||
if err == nil {
|
|
||||||
cdm.evtlPos.setSeq(0)
|
// We can safely reset the sequence only if the write queue is empty. If it's not empty,
|
||||||
} else {
|
// then there may be a job in the queue that will create a new segment file with an ID
|
||||||
// In case of error, set it to the last file number on the disk that was not deleted.
|
// generated before the sequence reset.
|
||||||
cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1]))
|
//
|
||||||
|
// The queueIsEmpty() function must be called while holding the cdm.evtlPosMtx to avoid
|
||||||
|
// a race condition with WriteChunk().
|
||||||
|
if cdm.writeQueue == nil || cdm.writeQueue.queueIsEmpty() {
|
||||||
|
if err == nil {
|
||||||
|
cdm.evtlPos.setSeq(0)
|
||||||
|
} else {
|
||||||
|
// In case of error, set it to the last file number on the disk that was not deleted.
|
||||||
|
cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1]))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cdm.evtlPosMtx.Unlock()
|
cdm.evtlPosMtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,7 +19,9 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
@ -356,6 +358,56 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
|
|||||||
verifyFiles([]int{5, 6, 7})
|
verifyFiles([]int{5, 6, 7})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestChunkDiskMapper_Truncate_WriteQueueRaceCondition(t *testing.T) {
|
||||||
|
hrw := createChunkDiskMapper(t, "")
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, hrw.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
// This test should only run when the queue is enabled.
|
||||||
|
if hrw.writeQueue == nil {
|
||||||
|
t.Skip("This test should only run when the queue is enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add an artificial delay in the writeChunk function to easily trigger the race condition.
|
||||||
|
origWriteChunk := hrw.writeQueue.writeChunk
|
||||||
|
hrw.writeQueue.writeChunk = func(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) error {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
return origWriteChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(2)
|
||||||
|
|
||||||
|
// Write a chunk. Since the queue is enabled, the chunk will be written asynchronously (with the artificial delay).
|
||||||
|
ref := hrw.WriteChunk(1, 0, 10, randomChunk(t), false, func(err error) {
|
||||||
|
defer wg.Done()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
seq, _ := ref.Unpack()
|
||||||
|
require.Equal(t, 1, seq)
|
||||||
|
|
||||||
|
// Truncate, simulating that all chunks from segment files before 1 can be dropped.
|
||||||
|
require.NoError(t, hrw.Truncate(1))
|
||||||
|
|
||||||
|
// Request to cut a new file when writing the next chunk. If there's a race condition, cutting a new file will
|
||||||
|
// allow us to detect there's actually an issue with the sequence number (because it's checked when a new segment
|
||||||
|
// file is created).
|
||||||
|
hrw.CutNewFile()
|
||||||
|
|
||||||
|
// Write another chunk. This will cut a new file.
|
||||||
|
ref = hrw.WriteChunk(1, 0, 10, randomChunk(t), false, func(err error) {
|
||||||
|
defer wg.Done()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
seq, _ = ref.Unpack()
|
||||||
|
require.Equal(t, 2, seq)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// TestHeadReadWriter_TruncateAfterIterateChunksError tests for
|
// TestHeadReadWriter_TruncateAfterIterateChunksError tests for
|
||||||
// https://github.com/prometheus/prometheus/issues/7753
|
// https://github.com/prometheus/prometheus/issues/7753
|
||||||
func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
|
func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user