Enhanced WAL replay for duplicate series record ()

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-08-03 20:03:54 +05:30 committed by GitHub
parent 8002a3ab80
commit 848cb5a6d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 22 deletions

View File

@ -1466,7 +1466,11 @@ type memChunk struct {
// OverlapsClosedInterval returns true if the chunk overlaps [mint, maxt].
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt)
}
func overlapsClosedInterval(mint1, maxt1, mint2, maxt2 int64) bool {
return mint1 <= maxt2 && mint2 <= maxt1
}
type mmappedChunk struct {
@ -1477,7 +1481,7 @@ type mmappedChunk struct {
// Returns true if the chunk overlaps [mint, maxt].
func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt)
}
type noopSeriesLifecycleCallback struct{}

View File

@ -306,7 +306,9 @@ func TestHead_ReadWAL(t *testing.T) {
}
require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil)))
require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil)))
require.Equal(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil)))
// The samples before the new series record should be discarded since a duplicate record
// is only possible when old samples were compacted.
require.Equal(t, []sample{{101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil)))
q, err := head.ExemplarQuerier(context.Background())
require.NoError(t, err)
@ -369,9 +371,9 @@ func TestHead_WALMultiRef(t *testing.T) {
q, err := NewBlockQuerier(head, 0, 2100)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
// The samples before the new ref should be discarded since Head truncation
// happens only after compacting the Head.
require.Equal(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {
sample{100, 1},
sample{1500, 2},
sample{1700, 3},
sample{2000, 4},
}}, series)

View File

@ -18,6 +18,7 @@ import (
"math"
"runtime"
"sync"
"time"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
@ -181,38 +182,74 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
}
}()
// The records are always replayed from the oldest to the newest.
Outer:
for d := range decoded {
switch v := d.(type) {
case []record.RefSeries:
for _, s := range v {
series, created, err := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
for _, walSeries := range v {
mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels)
if err != nil {
seriesCreationErr = err
break Outer
}
if h.lastSeriesID.Load() < walSeries.Ref {
h.lastSeriesID.Store(walSeries.Ref)
}
mmc := mmappedChunks[walSeries.Ref]
if created {
// If this series gets a duplicate record, we don't restore its mmapped chunks,
// and instead restore everything from WAL records.
series.mmappedChunks = mmappedChunks[series.ref]
// This is the first WAL series record for this series.
h.metrics.chunksCreated.Add(float64(len(mmc)))
h.metrics.chunks.Add(float64(len(mmc)))
mSeries.mmappedChunks = mmc
continue
}
h.metrics.chunks.Add(float64(len(series.mmappedChunks)))
h.metrics.chunksCreated.Add(float64(len(series.mmappedChunks)))
// There's already a different ref for this series.
// A duplicate series record is only possible when the old samples were already compacted into a block.
// Hence we can discard all the samples and m-mapped chunks replayed till now for this series.
if len(series.mmappedChunks) > 0 {
h.updateMinMaxTime(series.minTime(), series.maxTime())
multiRef[walSeries.Ref] = mSeries.ref
idx := mSeries.ref % uint64(n)
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
inputs[idx] <- []record.RefSample{}
for len(inputs[idx]) != 0 {
time.Sleep(1 * time.Millisecond)
}
// Checking if the new m-mapped chunks overlap with the already existing ones.
// This should never happen, but we have a check anyway to detect any
// edge cases that we might have missed.
if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 {
if overlapsClosedInterval(
mSeries.mmappedChunks[0].minTime,
mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
mmc[0].minTime,
mmc[len(mmc)-1].maxTime,
) {
// The m-map chunks for the new series ref overlaps with old m-map chunks.
seriesCreationErr = errors.Errorf("overlapping m-mapped chunks for series %s", mSeries.lset.String())
break Outer
}
} else {
// TODO(codesome) Discard old samples and mmapped chunks and use mmap chunks for the new series ID.
// There's already a different ref for this series.
multiRef[s.Ref] = series.ref
}
if h.lastSeriesID.Load() < s.Ref {
h.lastSeriesID.Store(s.Ref)
}
// Replacing m-mapped chunks with the new ones (could be empty).
h.metrics.chunksCreated.Add(float64(len(mmc)))
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks)))
mSeries.mmappedChunks = mmc
// Any samples replayed till now would already be compacted. Resetting the head chunk.
mSeries.nextAt = 0
mSeries.headChunk = nil
mSeries.app = nil
h.updateMinMaxTime(mSeries.minTime(), mSeries.maxTime())
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
seriesPool.Put(v)