diff --git a/tsdb/head.go b/tsdb/head.go index b7bfaa0fd..9d81b24ae 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -178,6 +178,7 @@ type HeadOptions struct { WALReplayConcurrency int // EnableSharding enables ShardedPostings() support in the Head. + // EnableSharding is temporarily disabled during Init(). EnableSharding bool } @@ -609,7 +610,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second // Init loads data from the write ahead log and prepares the head for writes. // It should be called before using an appender so that it // limits the ingested samples to the head min valid time. -func (h *Head) Init(minValidTime int64) error { +func (h *Head) Init(minValidTime int64) (err error) { h.minValidTime.Store(minValidTime) defer func() { h.postings.EnsureOrder(h.opts.WALReplayConcurrency) @@ -623,6 +624,24 @@ func (h *Head) Init(minValidTime int64) error { } }() + // If sharding is enabled, disable it while initializing, and calculate the shards later. + // We're going to use that field for other purposes during WAL replay, + // so we don't want to waste time on calculating the shard that we're going to lose anyway. + if h.opts.EnableSharding { + h.opts.EnableSharding = false + defer func() { + h.opts.EnableSharding = true + if err == nil { + // No locking is needed here as nobody should be writing while we're in Init. + for _, stripe := range h.series.series { + for _, s := range stripe { + s.shardHashOrMemoryMappedMaxTime = labels.StableHash(s.lset) + } + } + } + }() + } + level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any") start := time.Now() @@ -683,7 +702,6 @@ func (h *Head) Init(minValidTime int64) error { mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk lastMmapRef chunks.ChunkDiskMapperRef - err error mmapChunkReplayDuration time.Duration ) @@ -2068,9 +2086,11 @@ type memSeries struct { ref chunks.HeadSeriesRef meta *metadata.Metadata - // Series labels hash to use for sharding purposes. The value is always 0 when sharding has not - // been explicitly enabled in TSDB. - shardHash uint64 + // Series labels hash to use for sharding purposes. + // The value is always 0 when sharding has not been explicitly enabled in TSDB. + // While the WAL replay the value stored here is the max time of any mmapped chunk, + // and the shard hash is re-calculated after WAL replay is complete. + shardHashOrMemoryMappedMaxTime uint64 // Everything after here should only be accessed with the lock held. sync.Mutex @@ -2095,8 +2115,6 @@ type memSeries struct { ooo *memSeriesOOOFields - mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. - nextAt int64 // Timestamp at which to cut the next chunk. histogramChunkHasComputedEndTime bool // True if nextAt has been predicted for the current histograms chunk; false otherwise. pendingCommit bool // Whether there are samples waiting to be committed to this series. @@ -2127,10 +2145,10 @@ type memSeriesOOOFields struct { func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled bool) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - nextAt: math.MinInt64, - shardHash: shardHash, + lset: lset, + ref: id, + nextAt: math.MinInt64, + shardHashOrMemoryMappedMaxTime: shardHash, } if !isolationDisabled { s.txs = newTxRing(0) @@ -2218,6 +2236,12 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD return removedInOrder + removedOOO } +// shardHash returns the shard hash of the series, only available after WAL replay. +func (s *memSeries) shardHash() uint64 { return s.shardHashOrMemoryMappedMaxTime } + +// mmMaxTime returns the max time of any mmapped chunk in the series, only available during WAL replay. +func (s *memSeries) mmMaxTime() int64 { return int64(s.shardHashOrMemoryMappedMaxTime) } + // cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after // acquiring lock. func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 9ba8785ad..3a50f316b 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -170,7 +170,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou } // Check if the series belong to the shard. - if s.shardHash%shardCount != shardIndex { + if s.shardHash()%shardCount != shardIndex { continue } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index c192c8a07..09927c23c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -23,6 +23,7 @@ import ( "path" "path/filepath" "reflect" + "runtime/pprof" "sort" "strconv" "strings" @@ -89,6 +90,43 @@ func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts return h, wal } +// BenchmarkLoadRealWLs will be skipped unless the BENCHMARK_LOAD_REAL_WLS_DIR environment variable is set. +// BENCHMARK_LOAD_REAL_WLS_DIR should be the folder where `wal` and `chunks_head` are located. +// Optionally, BENCHMARK_LOAD_REAL_WLS_PROFILE can be set to a file path to write a CPU profile. +func BenchmarkLoadRealWLs(b *testing.B) { + dir := os.Getenv("BENCHMARK_LOAD_REAL_WLS_DIR") + if dir == "" { + b.Skipped() + } + + profileFile := os.Getenv("BENCHMARK_LOAD_REAL_WLS_PROFILE") + if profileFile != "" { + b.Logf("Will profile in %s", profileFile) + f, err := os.Create(profileFile) + require.NoError(b, err) + b.Cleanup(func() { f.Close() }) + require.NoError(b, pprof.StartCPUProfile(f)) + b.Cleanup(pprof.StopCPUProfile) + } + + wal, err := wlog.New(nil, nil, filepath.Join(dir, "wal"), wlog.CompressionNone) + require.NoError(b, err) + b.Cleanup(func() { wal.Close() }) + + wbl, err := wlog.New(nil, nil, filepath.Join(dir, "wbl"), wlog.CompressionNone) + require.NoError(b, err) + b.Cleanup(func() { wbl.Close() }) + + // Load the WAL. + for i := 0; i < b.N; i++ { + opts := DefaultHeadOptions() + opts.ChunkDirRoot = dir + h, err := NewHead(nil, nil, wal, wbl, opts, nil) + require.NoError(b, err) + h.Init(0) + } +} + func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) h, _ := newTestHead(b, 10000, wlog.CompressionNone, false) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 787cb7c26..2852709a0 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -435,6 +435,8 @@ Outer: return nil } +func minInt64() int64 { return math.MinInt64 } + // resetSeriesWithMMappedChunks is only used during the WAL replay. func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) { if mSeries.ref != walSeriesRef { @@ -481,10 +483,11 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m } // Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject. if len(mmc) == 0 { - mSeries.mmMaxTime = math.MinInt64 + mSeries.shardHashOrMemoryMappedMaxTime = uint64(minInt64()) } else { - mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime - h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime) + mmMaxTime := mmc[len(mmc)-1].maxTime + mSeries.shardHashOrMemoryMappedMaxTime = uint64(mmMaxTime) + h.updateMinMaxTime(mmc[0].minTime, mmMaxTime) } if len(oooMmc) != 0 { // Mint and maxt can be in any chunk, they are not sorted. @@ -585,7 +588,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp unknownRefs++ continue } - if s.T <= ms.mmMaxTime { + if s.T <= ms.mmMaxTime() { continue } if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { @@ -614,7 +617,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp unknownHistogramRefs++ continue } - if s.t <= ms.mmMaxTime { + if s.t <= ms.mmMaxTime() { continue } var chunkCreated bool