Fix head stats and hooks when replaying a corrupted snapshot (#14079)

* Fixing head stats and hooks when replaying a corrupted snapshot

Signed-off-by: alanprot <alanprot@gmail.com>

* Fixing create/removed series metrics

Signed-off-by: alanprot <alanprot@gmail.com>

* Refactoring to have common code between gc and flush method

Signed-off-by: alanprot <alanprot@gmail.com>

* Update tsdb/head.go

Co-authored-by: Ayoub Mrini <ayoubmrini424@gmail.com>
Signed-off-by: Alan Protasio <alanprot@gmail.com>

* refactor

Signed-off-by: alanprot <alanprot@gmail.com>

* Update tsdb/head_test.go

Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
Signed-off-by: Alan Protasio <alanprot@gmail.com>

* Update tsdb/head_test.go

Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
Signed-off-by: Alan Protasio <alanprot@gmail.com>

---------

Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <alanprot@gmail.com>
Co-authored-by: Ayoub Mrini <ayoubmrini424@gmail.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Alan Protasio 2024-05-24 19:43:21 -07:00 committed by GitHub
parent c0221d9739
commit 8894d65cd6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 94 additions and 28 deletions

View File

@ -310,12 +310,22 @@ func (h *Head) resetInMemoryState() error {
return err return err
} }
if h.series != nil {
// reset the existing series to make sure we call the appropriated hooks
// and increment the series removed metrics
fs := h.series.iterForDeletion(func(_ int, _ uint64, s *memSeries, flushedForCallback map[chunks.HeadSeriesRef]labels.Labels) {
// All series should be flushed
flushedForCallback[s.ref] = s.lset
})
h.metrics.seriesRemoved.Add(float64(fs))
}
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
h.iso = newIsolation(h.opts.IsolationDisabled) h.iso = newIsolation(h.opts.IsolationDisabled)
h.oooIso = newOOOIsolation() h.oooIso = newOOOIsolation()
h.numSeries.Store(0)
h.exemplarMetrics = em h.exemplarMetrics = em
h.exemplars = es h.exemplars = es
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
h.postings = index.NewUnorderedMemPostings() h.postings = index.NewUnorderedMemPostings()
h.tombstones = tombstones.NewMemTombstones() h.tombstones = tombstones.NewMemTombstones()
h.deleted = map[chunks.HeadSeriesRef]int{} h.deleted = map[chunks.HeadSeriesRef]int{}
@ -1861,11 +1871,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series. // minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) { func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) {
var ( var (
deleted = map[storage.SeriesRef]struct{}{} deleted = map[storage.SeriesRef]struct{}{}
rmChunks = 0 rmChunks = 0
actualMint int64 = math.MaxInt64 actualMint int64 = math.MaxInt64
minOOOTime int64 = math.MaxInt64 minOOOTime int64 = math.MaxInt64
deletedFromPrevStripe = 0
) )
minMmapFile = math.MaxInt32 minMmapFile = math.MaxInt32
@ -1923,27 +1932,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
deletedForCallback[series.ref] = series.lset deletedForCallback[series.ref] = series.lset
} }
// Run through all series shard by shard, checking which should be deleted. s.iterForDeletion(check)
for i := 0; i < s.size; i++ {
deletedForCallback := make(map[chunks.HeadSeriesRef]labels.Labels, deletedFromPrevStripe)
s.locks[i].Lock()
// Delete conflicts first so seriesHashmap.del doesn't move them to the `unique` field,
// after deleting `unique`.
for hash, all := range s.hashes[i].conflicts {
for _, series := range all {
check(i, hash, series, deletedForCallback)
}
}
for hash, series := range s.hashes[i].unique {
check(i, hash, series, deletedForCallback)
}
s.locks[i].Unlock()
s.seriesLifecycleCallback.PostDeletion(deletedForCallback)
deletedFromPrevStripe = len(deletedForCallback)
}
if actualMint == math.MaxInt64 { if actualMint == math.MaxInt64 {
actualMint = mint actualMint = mint
@ -1952,6 +1941,35 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
return deleted, rmChunks, actualMint, minOOOTime, minMmapFile return deleted, rmChunks, actualMint, minOOOTime, minMmapFile
} }
// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.
// The checkDeletedFunc takes a map as input and should add to it all series that were deleted and should be included
// when invoking the PostDeletion hook.
func (s *stripeSeries) iterForDeletion(checkDeletedFunc func(int, uint64, *memSeries, map[chunks.HeadSeriesRef]labels.Labels)) int {
seriesSetFromPrevStripe := 0
totalDeletedSeries := 0
// Run through all series shard by shard
for i := 0; i < s.size; i++ {
seriesSet := make(map[chunks.HeadSeriesRef]labels.Labels, seriesSetFromPrevStripe)
s.locks[i].Lock()
// Iterate conflicts first so f doesn't move them to the `unique` field,
// after deleting `unique`.
for hash, all := range s.hashes[i].conflicts {
for _, series := range all {
checkDeletedFunc(i, hash, series, seriesSet)
}
}
for hash, series := range s.hashes[i].unique {
checkDeletedFunc(i, hash, series, seriesSet)
}
s.locks[i].Unlock()
s.seriesLifecycleCallback.PostDeletion(seriesSet)
totalDeletedSeries += len(seriesSet)
seriesSetFromPrevStripe = len(seriesSet)
}
return totalDeletedSeries
}
func (s *stripeSeries) getByID(id chunks.HeadSeriesRef) *memSeries { func (s *stripeSeries) getByID(id chunks.HeadSeriesRef) *memSeries {
i := uint64(id) & uint64(s.size-1) i := uint64(id) & uint64(s.size-1)

View File

@ -4007,6 +4007,9 @@ func TestSnapshotError(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
f, err := os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0) f, err := os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0)
require.NoError(t, err) require.NoError(t, err)
// Create snapshot backup to be restored on future test cases.
snapshotBackup, err := io.ReadAll(f)
require.NoError(t, err)
_, err = f.WriteAt([]byte{0b11111111}, 18) _, err = f.WriteAt([]byte{0b11111111}, 18)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, f.Close()) require.NoError(t, f.Close())
@ -4021,10 +4024,44 @@ func TestSnapshotError(t *testing.T) {
// There should be no series in the memory after snapshot error since WAL was removed. // There should be no series in the memory after snapshot error since WAL was removed.
require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal)) require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal))
require.Equal(t, uint64(0), head.NumSeries())
require.Nil(t, head.series.getByHash(lbls.Hash(), lbls)) require.Nil(t, head.series.getByHash(lbls.Hash(), lbls))
tm, err = head.tombstones.Get(1) tm, err = head.tombstones.Get(1)
require.NoError(t, err) require.NoError(t, err)
require.Empty(t, tm) require.Empty(t, tm)
require.NoError(t, head.Close())
// Test corruption in the middle of the snapshot.
f, err = os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0)
require.NoError(t, err)
_, err = f.WriteAt(snapshotBackup, 0)
require.NoError(t, err)
_, err = f.WriteAt([]byte{0b11111111}, 300)
require.NoError(t, err)
require.NoError(t, f.Close())
c := &countSeriesLifecycleCallback{}
opts := head.opts
opts.SeriesCallback = c
w, err = wlog.NewSize(nil, nil, head.wal.Dir(), 32768, wlog.CompressionNone)
require.NoError(t, err)
head, err = NewHead(prometheus.NewRegistry(), nil, w, nil, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
// There should be no series in the memory after snapshot error since WAL was removed.
require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal))
require.Nil(t, head.series.getByHash(lbls.Hash(), lbls))
require.Equal(t, uint64(0), head.NumSeries())
// Since the snapshot could replay certain series, we continue invoking the create hooks.
// In such instances, we need to ensure that we also trigger the delete hooks when resetting the memory.
require.Equal(t, int64(2), c.created.Load())
require.Equal(t, int64(2), c.deleted.Load())
require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.seriesRemoved))
require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.seriesCreated))
} }
func TestHistogramMetrics(t *testing.T) { func TestHistogramMetrics(t *testing.T) {
@ -5829,3 +5866,14 @@ func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) {
require.False(t, head.compactable()) require.False(t, head.compactable())
} }
type countSeriesLifecycleCallback struct {
created atomic.Int64
deleted atomic.Int64
}
func (c *countSeriesLifecycleCallback) PreCreation(labels.Labels) error { return nil }
func (c *countSeriesLifecycleCallback) PostCreation(labels.Labels) { c.created.Inc() }
func (c *countSeriesLifecycleCallback) PostDeletion(s map[chunks.HeadSeriesRef]labels.Labels) {
c.deleted.Add(int64(len(s)))
}