Fix chunk overflow appending samples at a variable rate (#10607)
* Add a test with variable samples rate append This test overflows the chunk created in memseries, and the total amount of samples in the (only) mmapped chunk is 29, instead of the 65565 appended ones. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Cut new chunk when rate prediction was wrong When appending samples at a slow rate, and then appending at a higher rate, the prediction we made to cut a new chunk is no longer valid. Sometimes this can even cause an overflow in the chunk, if more samples than uint16 can hold are appended. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Improve comment on 2*samplesPerChunk Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Assert that all chunks have less than 240 samples Also, trigger new chunk at 240, not at more than 240 Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
This commit is contained in:
parent
40c1efe8bc
commit
af0f6da5cb
|
@ -519,7 +519,12 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
|
|||
if numSamples == samplesPerChunk/4 {
|
||||
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
|
||||
}
|
||||
if t >= s.nextAt {
|
||||
// If numSamples > samplesPerChunk*2 then our previous prediction was invalid,
|
||||
// most likely because samples rate has changed and now they are arriving more frequently.
|
||||
// Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk
|
||||
// as we expect more chunks to come.
|
||||
// Note that next chunk will have its nextAt recalculated for the new rate.
|
||||
if t >= s.nextAt || numSamples >= samplesPerChunk*2 {
|
||||
c = s.cutNewHeadChunk(t, chunkDiskMapper)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
|
|
@ -1308,6 +1308,50 @@ func TestMemSeries_append(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMemSeries_append_atVariableRate(t *testing.T) {
|
||||
const samplesPerChunk = 120
|
||||
dir := t.TempDir()
|
||||
// This is usually taken from the Head, but passing manually here.
|
||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
})
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, DefaultBlockDuration, nil, defaultIsolationDisabled)
|
||||
|
||||
// At this slow rate, we will fill the chunk in two block durations.
|
||||
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
|
||||
|
||||
var nextTs int64
|
||||
var totalAppendedSamples int
|
||||
for i := 0; i < samplesPerChunk/4; i++ {
|
||||
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper)
|
||||
require.Truef(t, ok, "slow sample %d was not appended", i)
|
||||
nextTs += slowRate
|
||||
totalAppendedSamples++
|
||||
}
|
||||
require.Equal(t, DefaultBlockDuration, s.nextAt, "after appending a samplesPerChunk/4 samples at a slow rate, we should aim to cut a new block at the default block duration %d, but it's set to %d", DefaultBlockDuration, s.nextAt)
|
||||
|
||||
// Suddenly, the rate increases and we receive a sample every millisecond.
|
||||
for i := 0; i < math.MaxUint16; i++ {
|
||||
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper)
|
||||
require.Truef(t, ok, "quick sample %d was not appended", i)
|
||||
nextTs++
|
||||
totalAppendedSamples++
|
||||
}
|
||||
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper)
|
||||
require.True(t, ok, "new chunk sample was not appended")
|
||||
require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk")
|
||||
|
||||
var totalSamplesInChunks int
|
||||
for i, c := range s.mmappedChunks {
|
||||
totalSamplesInChunks += int(c.numSamples)
|
||||
require.LessOrEqualf(t, c.numSamples, uint16(2*samplesPerChunk), "mmapped chunk %d has more than %d samples", i, 2*samplesPerChunk)
|
||||
}
|
||||
require.Equal(t, totalAppendedSamples, totalSamplesInChunks, "wrong number of samples in %d mmapped chunks", len(s.mmappedChunks))
|
||||
}
|
||||
|
||||
func TestGCChunkAccess(t *testing.T) {
|
||||
// Put a chunk, select it. GC it and then access it.
|
||||
h, _ := newTestHead(t, 1000, false)
|
||||
|
|
Loading…
Reference in New Issue