Fix bugs and add enhancements to the chunk snapshot (#9185)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-08-17 22:38:16 +05:30 committed by GitHub
parent bb05485c79
commit 328a74ca36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 151 additions and 34 deletions

View File

@ -15,6 +15,7 @@ package tsdb
import (
"fmt"
"io"
"math"
"path/filepath"
"sync"
@ -173,28 +174,14 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
opts.SeriesCallback = &noopSeriesLifecycleCallback{}
}
em := NewExemplarMetrics(r)
es, err := NewCircularExemplarStorage(opts.MaxExemplars.Load(), em)
if err != nil {
return nil, err
}
if stats == nil {
stats = NewHeadStats()
}
h := &Head{
wal: wal,
logger: l,
opts: opts,
exemplarMetrics: em,
exemplars: es,
series: newStripeSeries(opts.StripeSize, opts.SeriesCallback),
symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(),
tombstones: tombstones.NewMemTombstones(),
iso: newIsolation(),
deleted: map[uint64]int{},
wal: wal,
logger: l,
opts: opts,
memChunkPool: sync.Pool{
New: func() interface{} {
return &memChunk{}
@ -203,11 +190,9 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
stats: stats,
reg: r,
}
h.chunkRange.Store(opts.ChunkRange)
h.minTime.Store(math.MaxInt64)
h.maxTime.Store(math.MinInt64)
h.lastWALTruncationTime.Store(math.MinInt64)
h.lastMemoryTruncationTime.Store(math.MinInt64)
if err := h.resetInMemoryState(); err != nil {
return nil, err
}
h.metrics = newHeadMetrics(h, r)
if opts.ChunkPool == nil {
@ -226,6 +211,30 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
return h, nil
}
func (h *Head) resetInMemoryState() error {
var err error
em := NewExemplarMetrics(h.reg)
es, err := NewCircularExemplarStorage(h.opts.MaxExemplars.Load(), em)
if err != nil {
return err
}
h.exemplarMetrics = em
h.exemplars = es
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
h.symbols = map[string]struct{}{}
h.postings = index.NewUnorderedMemPostings()
h.tombstones = tombstones.NewMemTombstones()
h.iso = newIsolation()
h.deleted = map[uint64]int{}
h.chunkRange.Store(h.opts.ChunkRange)
h.minTime.Store(math.MaxInt64)
h.maxTime.Store(math.MinInt64)
h.lastWALTruncationTime.Store(math.MinInt64)
h.lastMemoryTruncationTime.Store(math.MinInt64)
return nil
}
type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.GaugeFunc
@ -249,6 +258,7 @@ type headMetrics struct {
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
mmapChunkCorruptionTotal prometheus.Counter
snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1.
}
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
@ -343,6 +353,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_mmap_chunk_corruptions_total",
Help: "Total number of memory-mapped chunk corruptions.",
}),
snapshotReplayErrorTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_snapshot_replay_error_total",
Help: "Total number snapshot replays that failed.",
}),
}
if r != nil {
@ -369,6 +383,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.checkpointCreationFail,
m.checkpointCreationTotal,
m.mmapChunkCorruptionTotal,
m.snapshotReplayErrorTotal,
// Metrics bound to functions and not needed in tests
// can be created and registered on the spot.
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
@ -439,7 +454,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
// Init loads data from the write ahead log and prepares the head for writes.
// It should be called before using an appender so that it
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
func (h *Head) Init(minValidTime int64) (err error) {
h.minValidTime.Store(minValidTime)
defer h.postings.EnsureOrder()
defer h.gc() // After loading the wal remove the obsolete data from the head.
@ -454,11 +469,23 @@ func (h *Head) Init(minValidTime int64) error {
level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any")
start := time.Now()
snapIdx, snapOffset, refSeries, err := h.loadChunkSnapshot()
if err != nil {
return err
snapIdx, snapOffset := -1, 0
refSeries := make(map[uint64]*memSeries)
if h.opts.EnableMemorySnapshotOnShutdown {
level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot")
snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot()
if err != nil {
snapIdx, snapOffset = -1, 0
h.metrics.snapshotReplayErrorTotal.Inc()
level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err)
// We clear the partially loaded data to replay fresh from the WAL.
if err := h.resetInMemoryState(); err != nil {
return err
}
}
level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String())
}
level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String())
mmapChunkReplayStart := time.Now()
mmappedChunks, err := h.loadMmappedChunks(refSeries)
@ -535,6 +562,10 @@ func (h *Head) Init(minValidTime int64) error {
offset = snapOffset
}
sr, err := wal.NewSegmentBufReaderWithOffset(offset, s)
if errors.Cause(err) == io.EOF {
// File does not exist.
continue
}
if err != nil {
return errors.Wrapf(err, "segment reader (offset=%d)", offset)
}
@ -1478,10 +1509,13 @@ func (s *memSeries) minTime() int64 {
func (s *memSeries) maxTime() int64 {
c := s.head()
if c == nil {
return math.MinInt64
if c != nil {
return c.maxTime
}
return c.maxTime
if len(s.mmappedChunks) > 0 {
return s.mmappedChunks[len(s.mmappedChunks)-1].maxTime
}
return math.MinInt64
}
// truncateChunksBefore removes all chunks from the series that

View File

@ -21,6 +21,7 @@ import (
"math"
"math/rand"
"os"
"path"
"path/filepath"
"sort"
"strconv"
@ -2504,12 +2505,18 @@ func TestChunkSnapshot(t *testing.T) {
for i := 1; i <= numSeries; i++ {
lbls := labels.Labels{labels.Label{Name: "foo", Value: fmt.Sprintf("bar%d", i)}}
lblStr := lbls.String()
// 240 samples should m-map at least 1 chunk.
for ts := int64(1); ts <= 240; ts++ {
// Should m-map at least 1 chunk.
for ts := int64(1); ts <= 200; ts++ {
val := rand.Float64()
expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val})
_, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
// To create multiple WAL records.
if ts%10 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
}
require.NoError(t, app.Commit())
@ -2581,12 +2588,18 @@ func TestChunkSnapshot(t *testing.T) {
for i := 1; i <= numSeries; i++ {
lbls := labels.Labels{labels.Label{Name: "foo", Value: fmt.Sprintf("bar%d", i)}}
lblStr := lbls.String()
// 240 samples should m-map at least 1 chunk.
for ts := int64(241); ts <= 480; ts++ {
// Should m-map at least 1 chunk.
for ts := int64(201); ts <= 400; ts++ {
val := rand.Float64()
expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val})
_, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
// To create multiple WAL records.
if ts%10 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
}
}
}
require.NoError(t, app.Commit())
@ -2624,6 +2637,7 @@ func TestChunkSnapshot(t *testing.T) {
// Create new Head to replay snapshot, m-map chunks, and WAL.
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err)
head.opts.EnableMemorySnapshotOnShutdown = true // Enabled to read from snapshot.
head, err = NewHead(nil, nil, w, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
@ -2647,3 +2661,62 @@ func TestChunkSnapshot(t *testing.T) {
require.Equal(t, expTombstones, actTombstones)
}
}
func TestSnapshotError(t *testing.T) {
head, _ := newTestHead(t, 120*4, false)
defer func() {
head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close())
}()
// Add a sample.
app := head.Appender(context.Background())
lbls := labels.Labels{labels.Label{Name: "foo", Value: "bar"}}
_, err := app.Append(0, lbls, 99, 99)
require.NoError(t, err)
require.NoError(t, app.Commit())
// Add some tombstones.
itvs := tombstones.Intervals{
{Mint: 1234, Maxt: 2345},
{Mint: 3456, Maxt: 4567},
}
head.tombstones.AddInterval(1, itvs...)
// Check existance of data.
require.NotNil(t, head.series.getByHash(lbls.Hash(), lbls))
tm, err := head.tombstones.Get(1)
require.NoError(t, err)
require.NotEqual(t, 0, len(tm))
head.opts.EnableMemorySnapshotOnShutdown = true
require.NoError(t, head.Close()) // This will create a snapshot.
// Remove the WAL so that we don't load from it.
require.NoError(t, os.RemoveAll(head.wal.Dir()))
// Corrupt the snapshot.
snapDir, _, _, err := LastChunkSnapshot(head.opts.ChunkDirRoot)
require.NoError(t, err)
files, err := ioutil.ReadDir(snapDir)
require.NoError(t, err)
f, err := os.OpenFile(path.Join(snapDir, files[0].Name()), os.O_RDWR, 0)
require.NoError(t, err)
_, err = f.WriteAt([]byte{0b11111111}, 18)
require.NoError(t, err)
require.NoError(t, f.Close())
// Create new Head which should replay this snapshot.
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
// There should be no series in the memory after snapshot error since WAL was removed.
require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal))
require.Nil(t, head.series.getByHash(lbls.Hash(), lbls))
tm, err = head.tombstones.Get(1)
require.NoError(t, err)
require.Equal(t, 0, len(tm))
}

View File

@ -735,6 +735,8 @@ func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error {
return errs.Err()
}
// loadChunkSnapshot replays the chunk snapshot and restores the Head state from it. If there was any error returned,
// it is the responsibility of the caller to clear the contents of the Head.
func (h *Head) loadChunkSnapshot() (int, int, map[uint64]*memSeries, error) {
dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
if err != nil {
@ -849,6 +851,10 @@ Outer:
loopErr = errors.Wrap(err, "iterate tombstones")
break Outer
}
default:
// This is a record type we don't understand. It is either and old format from earlier versions,
// or a new format and the code was rolled back to old version.
loopErr = errors.Errorf("unsuported snapshot record type 0b%b", rec[0])
}
}
@ -864,6 +870,10 @@ Outer:
return -1, -1, nil, err
}
if r.Err() != nil {
return -1, -1, nil, errors.Wrap(r.Err(), "read records")
}
refSeries := make(map[uint64]*memSeries, numSeries)
for _, shard := range shardedRefSeries {
for k, v := range shard {