2021-08-03 12:14:26 +00:00
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"fmt"
"math"
2021-08-06 16:51:01 +00:00
"os"
"path/filepath"
2021-08-03 12:14:26 +00:00
"runtime"
2021-08-06 16:51:01 +00:00
"strconv"
"strings"
2021-08-03 12:14:26 +00:00
"sync"
2021-08-03 14:33:54 +00:00
"time"
2021-08-03 12:14:26 +00:00
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"go.uber.org/atomic"
2021-11-08 14:23:17 +00:00
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
2022-07-19 08:58:52 +00:00
"github.com/prometheus/prometheus/model/metadata"
2021-08-03 12:14:26 +00:00
"github.com/prometheus/prometheus/storage"
2021-10-22 08:19:38 +00:00
"github.com/prometheus/prometheus/tsdb/chunkenc"
2021-11-06 10:10:04 +00:00
"github.com/prometheus/prometheus/tsdb/chunks"
2021-10-22 08:19:38 +00:00
"github.com/prometheus/prometheus/tsdb/encoding"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
2021-08-03 12:14:26 +00:00
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/wal"
)
2022-09-20 17:05:50 +00:00
func ( h * Head ) loadWAL ( r * wal . Reader , multiRef map [ chunks . HeadSeriesRef ] chunks . HeadSeriesRef , mmappedChunks , oooMmappedChunks map [ chunks . HeadSeriesRef ] [ ] * mmappedChunk ) ( err error ) {
2021-08-03 12:14:26 +00:00
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs atomic . Uint64
var unknownExemplarRefs atomic . Uint64
2022-07-19 08:58:52 +00:00
var unknownMetadataRefs atomic . Uint64
2021-10-01 11:30:22 +00:00
// Track number of series records that had overlapping m-map chunks.
2022-08-17 13:53:57 +00:00
var mmapOverlappingChunks atomic . Uint64
2021-08-03 12:14:26 +00:00
// Start workers that each process samples for a partition of the series ID space.
var (
wg sync . WaitGroup
n = runtime . GOMAXPROCS ( 0 )
2021-11-25 08:06:14 +00:00
processors = make ( [ ] walSubsetProcessor , n )
2021-08-03 12:14:26 +00:00
exemplarsInput chan record . RefExemplar
dec record . Decoder
shards = make ( [ ] [ ] record . RefSample , n )
decoded = make ( chan interface { } , 10 )
decodeErr , seriesCreationErr error
seriesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefSeries { }
} ,
}
samplesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefSample { }
} ,
}
tstonesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] tombstones . Stone { }
} ,
}
exemplarsPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefExemplar { }
} ,
}
2022-07-19 08:58:52 +00:00
metadataPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefMetadata { }
} ,
}
2021-08-03 12:14:26 +00:00
)
defer func ( ) {
// For CorruptionErr ensure to terminate all workers before exiting.
_ , ok := err . ( * wal . CorruptionErr )
if ok || seriesCreationErr != nil {
for i := 0 ; i < n ; i ++ {
2021-11-25 08:06:14 +00:00
processors [ i ] . closeAndDrain ( )
2021-08-03 12:14:26 +00:00
}
close ( exemplarsInput )
wg . Wait ( )
}
} ( )
wg . Add ( n )
for i := 0 ; i < n ; i ++ {
2021-11-25 08:06:14 +00:00
processors [ i ] . setup ( )
2021-08-03 12:14:26 +00:00
2021-11-25 08:06:14 +00:00
go func ( wp * walSubsetProcessor ) {
2022-09-20 17:05:50 +00:00
unknown , overlapping := wp . processWALSamples ( h , mmappedChunks , oooMmappedChunks )
2021-08-03 12:14:26 +00:00
unknownRefs . Add ( unknown )
2022-08-17 13:53:57 +00:00
mmapOverlappingChunks . Add ( overlapping )
2021-08-03 12:14:26 +00:00
wg . Done ( )
2021-11-25 08:06:14 +00:00
} ( & processors [ i ] )
2021-08-03 12:14:26 +00:00
}
wg . Add ( 1 )
exemplarsInput = make ( chan record . RefExemplar , 300 )
go func ( input <- chan record . RefExemplar ) {
2021-08-27 06:19:34 +00:00
var err error
2021-08-03 12:14:26 +00:00
defer wg . Done ( )
for e := range input {
ms := h . series . getByID ( e . Ref )
if ms == nil {
unknownExemplarRefs . Inc ( )
continue
}
if e . T < h . minValidTime . Load ( ) {
continue
}
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
// replaying the WAL, so lets just log the error if it's not that type.
err = h . exemplars . AddExemplar ( ms . lset , exemplar . Exemplar { Ts : e . T , Value : e . V , Labels : e . Labels } )
if err != nil && err == storage . ErrOutOfOrderExemplar {
level . Warn ( h . logger ) . Log ( "msg" , "Unexpected error when replaying WAL on exemplar record" , "err" , err )
}
}
} ( exemplarsInput )
go func ( ) {
defer close ( decoded )
2022-04-12 10:30:20 +00:00
var err error
2021-08-03 12:14:26 +00:00
for r . Next ( ) {
rec := r . Record ( )
switch dec . Type ( rec ) {
case record . Series :
series := seriesPool . Get ( ) . ( [ ] record . RefSeries ) [ : 0 ]
series , err = dec . Series ( rec , series )
if err != nil {
decodeErr = & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode series" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
decoded <- series
case record . Samples :
samples := samplesPool . Get ( ) . ( [ ] record . RefSample ) [ : 0 ]
samples , err = dec . Samples ( rec , samples )
if err != nil {
decodeErr = & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode samples" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
decoded <- samples
case record . Tombstones :
tstones := tstonesPool . Get ( ) . ( [ ] tombstones . Stone ) [ : 0 ]
tstones , err = dec . Tombstones ( rec , tstones )
if err != nil {
decodeErr = & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode tombstones" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
decoded <- tstones
case record . Exemplars :
exemplars := exemplarsPool . Get ( ) . ( [ ] record . RefExemplar ) [ : 0 ]
exemplars , err = dec . Exemplars ( rec , exemplars )
if err != nil {
decodeErr = & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode exemplars" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
decoded <- exemplars
2022-07-19 08:58:52 +00:00
case record . Metadata :
meta := metadataPool . Get ( ) . ( [ ] record . RefMetadata ) [ : 0 ]
meta , err := dec . Metadata ( rec , meta )
if err != nil {
decodeErr = & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode metadata" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
decoded <- meta
2021-08-03 12:14:26 +00:00
default :
// Noop.
}
}
} ( )
2021-08-03 14:33:54 +00:00
// The records are always replayed from the oldest to the newest.
2021-08-03 12:14:26 +00:00
Outer :
for d := range decoded {
switch v := d . ( type ) {
case [ ] record . RefSeries :
2021-08-03 14:33:54 +00:00
for _ , walSeries := range v {
mSeries , created , err := h . getOrCreateWithID ( walSeries . Ref , walSeries . Labels . Hash ( ) , walSeries . Labels )
2021-08-03 12:14:26 +00:00
if err != nil {
seriesCreationErr = err
break Outer
}
2021-11-06 10:10:04 +00:00
if chunks . HeadSeriesRef ( h . lastSeriesID . Load ( ) ) < walSeries . Ref {
h . lastSeriesID . Store ( uint64 ( walSeries . Ref ) )
2021-08-03 14:33:54 +00:00
}
2022-08-17 13:53:57 +00:00
if ! created {
multiRef [ walSeries . Ref ] = mSeries . ref
2021-08-03 14:33:54 +00:00
}
2021-08-03 12:14:26 +00:00
2022-08-17 13:53:57 +00:00
idx := uint64 ( mSeries . ref ) % uint64 ( n )
processors [ idx ] . input <- walSubsetProcessorInputItem { walSeriesRef : walSeries . Ref , existingSeries : mSeries }
2021-08-03 12:14:26 +00:00
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
seriesPool . Put ( v )
case [ ] record . RefSample :
samples := v
2022-09-15 07:06:57 +00:00
minValidTime := h . minValidTime . Load ( )
2021-08-03 12:14:26 +00:00
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len ( samples ) > 0 {
m := 5000
if len ( samples ) < m {
m = len ( samples )
}
for i := 0 ; i < n ; i ++ {
2022-09-20 14:13:30 +00:00
if shards [ i ] == nil {
shards [ i ] = processors [ i ] . reuseBuf ( )
}
2021-08-03 12:14:26 +00:00
}
for _ , sam := range samples [ : m ] {
2022-09-15 07:06:57 +00:00
if sam . T < minValidTime {
continue // Before minValidTime: discard.
}
2021-08-03 12:14:26 +00:00
if r , ok := multiRef [ sam . Ref ] ; ok {
sam . Ref = r
}
2021-11-06 10:10:04 +00:00
mod := uint64 ( sam . Ref ) % uint64 ( n )
2021-08-03 12:14:26 +00:00
shards [ mod ] = append ( shards [ mod ] , sam )
}
for i := 0 ; i < n ; i ++ {
2022-09-20 14:13:30 +00:00
if len ( shards [ i ] ) > 0 {
processors [ i ] . input <- walSubsetProcessorInputItem { samples : shards [ i ] }
shards [ i ] = nil
}
2021-08-03 12:14:26 +00:00
}
samples = samples [ m : ]
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
samplesPool . Put ( v )
case [ ] tombstones . Stone :
for _ , s := range v {
for _ , itv := range s . Intervals {
if itv . Maxt < h . minValidTime . Load ( ) {
continue
}
2021-11-06 10:10:04 +00:00
if m := h . series . getByID ( chunks . HeadSeriesRef ( s . Ref ) ) ; m == nil {
2021-08-03 12:14:26 +00:00
unknownRefs . Inc ( )
continue
}
2021-11-06 10:10:04 +00:00
h . tombstones . AddInterval ( storage . SeriesRef ( s . Ref ) , itv )
2021-08-03 12:14:26 +00:00
}
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
tstonesPool . Put ( v )
case [ ] record . RefExemplar :
for _ , e := range v {
exemplarsInput <- e
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
exemplarsPool . Put ( v )
2022-07-19 08:58:52 +00:00
case [ ] record . RefMetadata :
for _ , m := range v {
s := h . series . getByID ( chunks . HeadSeriesRef ( m . Ref ) )
if s == nil {
unknownMetadataRefs . Inc ( )
continue
}
2022-08-17 10:02:28 +00:00
s . meta = & metadata . Metadata {
2022-07-19 08:58:52 +00:00
Type : record . ToTextparseMetricType ( m . Type ) ,
Unit : m . Unit ,
Help : m . Help ,
}
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
metadataPool . Put ( v )
2021-08-03 12:14:26 +00:00
default :
panic ( fmt . Errorf ( "unexpected decoded type: %T" , d ) )
}
}
if decodeErr != nil {
return decodeErr
}
if seriesCreationErr != nil {
// Drain the channel to unblock the goroutine.
for range decoded {
}
return seriesCreationErr
}
// Signal termination to each worker and wait for it to close its output channel.
for i := 0 ; i < n ; i ++ {
2021-11-25 08:06:14 +00:00
processors [ i ] . closeAndDrain ( )
2021-08-03 12:14:26 +00:00
}
close ( exemplarsInput )
wg . Wait ( )
if r . Err ( ) != nil {
return errors . Wrap ( r . Err ( ) , "read records" )
}
2022-07-19 08:58:52 +00:00
if unknownRefs . Load ( ) > 0 || unknownExemplarRefs . Load ( ) > 0 || unknownMetadataRefs . Load ( ) > 0 {
level . Warn ( h . logger ) . Log ( "msg" , "Unknown series references" , "samples" , unknownRefs . Load ( ) , "exemplars" , unknownExemplarRefs . Load ( ) , "metadata" , unknownMetadataRefs . Load ( ) )
2021-08-03 12:14:26 +00:00
}
2022-08-17 13:53:57 +00:00
if count := mmapOverlappingChunks . Load ( ) ; count > 0 {
level . Info ( h . logger ) . Log ( "msg" , "Overlapping m-map chunks on duplicate series records" , "count" , count )
2021-10-01 11:30:22 +00:00
}
2021-08-03 12:14:26 +00:00
return nil
}
2021-11-25 08:06:14 +00:00
// resetSeriesWithMMappedChunks is only used during the WAL replay.
2022-09-20 17:05:50 +00:00
func ( h * Head ) resetSeriesWithMMappedChunks ( mSeries * memSeries , mmc , oooMmc [ ] * mmappedChunk , walSeriesRef chunks . HeadSeriesRef ) ( overlapped bool ) {
2022-08-17 13:53:57 +00:00
if mSeries . ref != walSeriesRef {
// Checking if the new m-mapped chunks overlap with the already existing ones.
if len ( mSeries . mmappedChunks ) > 0 && len ( mmc ) > 0 {
if overlapsClosedInterval (
mSeries . mmappedChunks [ 0 ] . minTime ,
mSeries . mmappedChunks [ len ( mSeries . mmappedChunks ) - 1 ] . maxTime ,
mmc [ 0 ] . minTime ,
mmc [ len ( mmc ) - 1 ] . maxTime ,
) {
level . Debug ( h . logger ) . Log (
"msg" , "M-mapped chunks overlap on a duplicate series record" ,
"series" , mSeries . lset . String ( ) ,
"oldref" , mSeries . ref ,
"oldmint" , mSeries . mmappedChunks [ 0 ] . minTime ,
"oldmaxt" , mSeries . mmappedChunks [ len ( mSeries . mmappedChunks ) - 1 ] . maxTime ,
"newref" , walSeriesRef ,
"newmint" , mmc [ 0 ] . minTime ,
"newmaxt" , mmc [ len ( mmc ) - 1 ] . maxTime ,
)
overlapped = true
}
}
}
2022-09-20 17:05:50 +00:00
h . metrics . chunksCreated . Add ( float64 ( len ( mmc ) + len ( oooMmc ) ) )
2021-08-10 09:23:31 +00:00
h . metrics . chunksRemoved . Add ( float64 ( len ( mSeries . mmappedChunks ) ) )
2022-09-20 17:05:50 +00:00
h . metrics . chunks . Add ( float64 ( len ( mmc ) + len ( oooMmc ) - len ( mSeries . mmappedChunks ) ) )
2021-08-10 09:23:31 +00:00
mSeries . mmappedChunks = mmc
2022-09-20 17:05:50 +00:00
mSeries . oooMmappedChunks = oooMmc
2021-08-10 09:23:31 +00:00
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
if len ( mmc ) == 0 {
mSeries . mmMaxTime = math . MinInt64
} else {
mSeries . mmMaxTime = mmc [ len ( mmc ) - 1 ] . maxTime
h . updateMinMaxTime ( mmc [ 0 ] . minTime , mSeries . mmMaxTime )
}
2022-09-20 17:05:50 +00:00
if len ( oooMmc ) != 0 {
// Mint and maxt can be in any chunk, they are not sorted.
mint , maxt := int64 ( math . MaxInt64 ) , int64 ( math . MinInt64 )
for _ , ch := range oooMmc {
if ch . minTime < mint {
mint = ch . minTime
}
if ch . maxTime > maxt {
maxt = ch . maxTime
}
}
h . updateMinOOOMaxOOOTime ( mint , maxt )
}
2021-11-25 08:06:14 +00:00
// Any samples replayed till now would already be compacted. Resetting the head chunk.
mSeries . nextAt = 0
mSeries . headChunk = nil
mSeries . app = nil
2022-08-17 13:53:57 +00:00
return
2021-11-25 08:06:14 +00:00
}
type walSubsetProcessor struct {
2022-08-17 13:53:57 +00:00
input chan walSubsetProcessorInputItem
2021-11-25 08:06:14 +00:00
output chan [ ] record . RefSample
}
2022-08-17 13:53:57 +00:00
type walSubsetProcessorInputItem struct {
samples [ ] record . RefSample
existingSeries * memSeries
walSeriesRef chunks . HeadSeriesRef
}
2021-11-25 08:06:14 +00:00
func ( wp * walSubsetProcessor ) setup ( ) {
wp . output = make ( chan [ ] record . RefSample , 300 )
2022-08-17 13:53:57 +00:00
wp . input = make ( chan walSubsetProcessorInputItem , 300 )
2021-11-25 08:06:14 +00:00
}
func ( wp * walSubsetProcessor ) closeAndDrain ( ) {
close ( wp . input )
for range wp . output {
}
}
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func ( wp * walSubsetProcessor ) reuseBuf ( ) [ ] record . RefSample {
select {
case buf := <- wp . output :
return buf [ : 0 ]
default :
}
return nil
2021-08-10 09:23:31 +00:00
}
2021-08-10 08:02:42 +00:00
// processWALSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse.
2022-09-20 17:05:50 +00:00
func ( wp * walSubsetProcessor ) processWALSamples ( h * Head , mmappedChunks , oooMmappedChunks map [ chunks . HeadSeriesRef ] [ ] * mmappedChunk ) ( unknownRefs , mmapOverlappingChunks uint64 ) {
2021-11-25 08:06:14 +00:00
defer close ( wp . output )
2021-08-03 12:14:26 +00:00
mint , maxt := int64 ( math . MaxInt64 ) , int64 ( math . MinInt64 )
2022-09-27 08:22:22 +00:00
chunkRange := h . chunkRange . Load ( )
2021-08-03 12:14:26 +00:00
2022-08-17 13:53:57 +00:00
for in := range wp . input {
if in . existingSeries != nil {
mmc := mmappedChunks [ in . walSeriesRef ]
2022-09-20 17:05:50 +00:00
oooMmc := oooMmappedChunks [ in . walSeriesRef ]
if h . resetSeriesWithMMappedChunks ( in . existingSeries , mmc , oooMmc , in . walSeriesRef ) {
2022-08-17 13:53:57 +00:00
mmapOverlappingChunks ++
}
continue
}
for _ , s := range in . samples {
2021-08-10 09:23:31 +00:00
ms := h . series . getByID ( s . Ref )
2021-08-03 12:14:26 +00:00
if ms == nil {
2021-08-10 09:23:31 +00:00
unknownRefs ++
continue
}
if s . T <= ms . mmMaxTime {
continue
2021-08-03 12:14:26 +00:00
}
2022-09-27 08:22:22 +00:00
if _ , chunkCreated := ms . append ( s . T , s . V , 0 , h . chunkDiskMapper , chunkRange ) ; chunkCreated {
2021-08-03 12:14:26 +00:00
h . metrics . chunksCreated . Inc ( )
h . metrics . chunks . Inc ( )
}
if s . T > maxt {
maxt = s . T
}
if s . T < mint {
mint = s . T
}
}
2021-11-25 08:06:14 +00:00
select {
2022-08-17 13:53:57 +00:00
case wp . output <- in . samples :
default :
2021-11-25 08:06:14 +00:00
}
}
2022-08-17 13:53:57 +00:00
h . updateMinMaxTime ( mint , maxt )
return unknownRefs , mmapOverlappingChunks
2021-11-25 08:06:14 +00:00
}
2022-09-20 17:05:50 +00:00
func ( h * Head ) loadWBL ( r * wal . Reader , multiRef map [ chunks . HeadSeriesRef ] chunks . HeadSeriesRef , lastMmapRef chunks . ChunkDiskMapperRef ) ( err error ) {
// Track number of samples, m-map markers, that referenced a series we don't know about
// for error reporting.
var unknownRefs , mmapMarkerUnknownRefs atomic . Uint64
lastSeq , lastOff := lastMmapRef . Unpack ( )
// Start workers that each process samples for a partition of the series ID space.
var (
wg sync . WaitGroup
n = runtime . GOMAXPROCS ( 0 )
processors = make ( [ ] wblSubsetProcessor , n )
dec record . Decoder
shards = make ( [ ] [ ] record . RefSample , n )
decodedCh = make ( chan interface { } , 10 )
decodeErr error
samplesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefSample { }
} ,
}
markersPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefMmapMarker { }
} ,
}
)
defer func ( ) {
// For CorruptionErr ensure to terminate all workers before exiting.
// We also wrap it to identify OOO WBL corruption.
_ , ok := err . ( * wal . CorruptionErr )
if ok {
err = & errLoadWbl { err : err }
for i := 0 ; i < n ; i ++ {
processors [ i ] . closeAndDrain ( )
}
wg . Wait ( )
}
} ( )
wg . Add ( n )
for i := 0 ; i < n ; i ++ {
processors [ i ] . setup ( )
go func ( wp * wblSubsetProcessor ) {
unknown := wp . processWBLSamples ( h )
unknownRefs . Add ( unknown )
wg . Done ( )
} ( & processors [ i ] )
}
go func ( ) {
defer close ( decodedCh )
for r . Next ( ) {
rec := r . Record ( )
switch dec . Type ( rec ) {
case record . Samples :
samples := samplesPool . Get ( ) . ( [ ] record . RefSample ) [ : 0 ]
samples , err = dec . Samples ( rec , samples )
if err != nil {
decodeErr = & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode samples" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
decodedCh <- samples
case record . MmapMarkers :
markers := markersPool . Get ( ) . ( [ ] record . RefMmapMarker ) [ : 0 ]
markers , err = dec . MmapMarkers ( rec , markers )
if err != nil {
decodeErr = & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode mmap markers" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
decodedCh <- markers
default :
// Noop.
}
}
} ( )
// The records are always replayed from the oldest to the newest.
for d := range decodedCh {
switch v := d . ( type ) {
case [ ] record . RefSample :
samples := v
// We split up the samples into parts of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len ( samples ) > 0 {
m := 5000
if len ( samples ) < m {
m = len ( samples )
}
for i := 0 ; i < n ; i ++ {
shards [ i ] = processors [ i ] . reuseBuf ( )
}
for _ , sam := range samples [ : m ] {
if r , ok := multiRef [ sam . Ref ] ; ok {
sam . Ref = r
}
mod := uint64 ( sam . Ref ) % uint64 ( n )
shards [ mod ] = append ( shards [ mod ] , sam )
}
for i := 0 ; i < n ; i ++ {
processors [ i ] . input <- shards [ i ]
}
samples = samples [ m : ]
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
samplesPool . Put ( d )
case [ ] record . RefMmapMarker :
markers := v
for _ , rm := range markers {
seq , off := rm . MmapRef . Unpack ( )
if seq > lastSeq || ( seq == lastSeq && off > lastOff ) {
// This m-map chunk from markers was not present during
// the load of mmapped chunks that happened in the head
// initialization.
continue
}
if r , ok := multiRef [ rm . Ref ] ; ok {
rm . Ref = r
}
ms := h . series . getByID ( rm . Ref )
if ms == nil {
mmapMarkerUnknownRefs . Inc ( )
continue
}
idx := uint64 ( ms . ref ) % uint64 ( n )
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
processors [ idx ] . waitUntilIdle ( )
// Lock the subset so we can modify the series object
processors [ idx ] . mx . Lock ( )
// All samples till now have been m-mapped. Hence clear out the headChunk.
// In case some samples slipped through and went into m-map chunks because of changed
// chunk size parameters, we are not taking care of that here.
// TODO(codesome): see if there is a way to avoid duplicate m-map chunks if
// the size of ooo chunk was reduced between restart.
ms . oooHeadChunk = nil
processors [ idx ] . mx . Unlock ( )
}
default :
panic ( fmt . Errorf ( "unexpected decodedCh type: %T" , d ) )
}
}
if decodeErr != nil {
return decodeErr
}
// Signal termination to each worker and wait for it to close its output channel.
for i := 0 ; i < n ; i ++ {
processors [ i ] . closeAndDrain ( )
}
wg . Wait ( )
if r . Err ( ) != nil {
return errors . Wrap ( r . Err ( ) , "read records" )
}
if unknownRefs . Load ( ) > 0 || mmapMarkerUnknownRefs . Load ( ) > 0 {
level . Warn ( h . logger ) . Log ( "msg" , "Unknown series references for ooo WAL replay" , "samples" , unknownRefs . Load ( ) , "mmap_markers" , mmapMarkerUnknownRefs . Load ( ) )
}
return nil
}
type errLoadWbl struct {
err error
}
func ( e errLoadWbl ) Error ( ) string {
return e . err . Error ( )
}
// To support errors.Cause().
func ( e errLoadWbl ) Cause ( ) error {
return e . err
}
// To support errors.Unwrap().
func ( e errLoadWbl ) Unwrap ( ) error {
return e . err
}
// isErrLoadOOOWal returns a boolean if the error is errLoadWbl.
func isErrLoadOOOWal ( err error ) bool {
_ , ok := err . ( * errLoadWbl )
return ok
}
type wblSubsetProcessor struct {
mx sync . Mutex // Take this lock while modifying series in the subset.
input chan [ ] record . RefSample
output chan [ ] record . RefSample
}
func ( wp * wblSubsetProcessor ) setup ( ) {
wp . output = make ( chan [ ] record . RefSample , 300 )
wp . input = make ( chan [ ] record . RefSample , 300 )
}
func ( wp * wblSubsetProcessor ) closeAndDrain ( ) {
close ( wp . input )
for range wp . output {
}
}
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func ( wp * wblSubsetProcessor ) reuseBuf ( ) [ ] record . RefSample {
select {
case buf := <- wp . output :
return buf [ : 0 ]
default :
}
return nil
}
// processWBLSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse.
// Samples before the minValidTime timestamp are discarded.
func ( wp * wblSubsetProcessor ) processWBLSamples ( h * Head ) ( unknownRefs uint64 ) {
defer close ( wp . output )
2022-09-27 08:22:22 +00:00
oooCapMax := h . opts . OutOfOrderCapMax . Load ( )
2022-09-20 17:05:50 +00:00
// We don't check for minValidTime for ooo samples.
mint , maxt := int64 ( math . MaxInt64 ) , int64 ( math . MinInt64 )
for samples := range wp . input {
wp . mx . Lock ( )
for _ , s := range samples {
ms := h . series . getByID ( s . Ref )
if ms == nil {
unknownRefs ++
continue
}
2022-09-27 08:22:22 +00:00
ok , chunkCreated , _ := ms . insert ( s . T , s . V , h . chunkDiskMapper , oooCapMax )
2022-09-20 17:05:50 +00:00
if chunkCreated {
h . metrics . chunksCreated . Inc ( )
h . metrics . chunks . Inc ( )
}
if ok {
if s . T < mint {
mint = s . T
}
if s . T > maxt {
maxt = s . T
}
}
}
wp . mx . Unlock ( )
wp . output <- samples
}
h . updateMinOOOMaxOOOTime ( mint , maxt )
return unknownRefs
}
func ( wp * wblSubsetProcessor ) waitUntilIdle ( ) {
select {
case <- wp . output : // Allow output side to drain to avoid deadlock.
default :
}
wp . input <- [ ] record . RefSample { }
for len ( wp . input ) != 0 {
time . Sleep ( 10 * time . Microsecond )
select {
case <- wp . output : // Allow output side to drain to avoid deadlock.
default :
}
}
}
2021-08-06 16:51:01 +00:00
const (
chunkSnapshotRecordTypeSeries uint8 = 1
chunkSnapshotRecordTypeTombstones uint8 = 2
2021-08-30 14:04:38 +00:00
chunkSnapshotRecordTypeExemplars uint8 = 3
2021-08-06 16:51:01 +00:00
)
type chunkSnapshotRecord struct {
2022-09-27 08:22:22 +00:00
ref chunks . HeadSeriesRef
lset labels . Labels
mc * memChunk
2022-09-27 14:02:05 +00:00
lastValue float64
2021-08-06 16:51:01 +00:00
}
func ( s * memSeries ) encodeToSnapshotRecord ( b [ ] byte ) [ ] byte {
buf := encoding . Encbuf { B : b }
buf . PutByte ( chunkSnapshotRecordTypeSeries )
2021-11-06 10:10:04 +00:00
buf . PutBE64 ( uint64 ( s . ref ) )
2022-07-26 14:42:00 +00:00
record . EncodeLabels ( & buf , s . lset )
2022-09-27 08:22:22 +00:00
buf . PutBE64int64 ( 0 ) // Backwards-compatibility; was chunkRange but now unused.
2021-08-06 16:51:01 +00:00
s . Lock ( )
if s . headChunk == nil {
buf . PutUvarint ( 0 )
} else {
buf . PutUvarint ( 1 )
buf . PutBE64int64 ( s . headChunk . minTime )
buf . PutBE64int64 ( s . headChunk . maxTime )
buf . PutByte ( byte ( s . headChunk . chunk . Encoding ( ) ) )
buf . PutUvarintBytes ( s . headChunk . chunk . Bytes ( ) )
2022-09-27 14:02:05 +00:00
// Backwards compatibility for old sampleBuf which had last 4 samples.
for i := 0 ; i < 3 ; i ++ {
buf . PutBE64int64 ( 0 )
buf . PutBEFloat64 ( 0 )
2021-08-06 16:51:01 +00:00
}
2022-09-27 14:02:05 +00:00
buf . PutBE64int64 ( 0 )
buf . PutBEFloat64 ( s . lastValue )
2021-08-06 16:51:01 +00:00
}
s . Unlock ( )
return buf . Get ( )
}
2022-07-26 14:42:00 +00:00
func decodeSeriesFromChunkSnapshot ( d * record . Decoder , b [ ] byte ) ( csr chunkSnapshotRecord , err error ) {
2021-08-06 16:51:01 +00:00
dec := encoding . Decbuf { B : b }
if flag := dec . Byte ( ) ; flag != chunkSnapshotRecordTypeSeries {
return csr , errors . Errorf ( "invalid record type %x" , flag )
}
2021-11-06 10:10:04 +00:00
csr . ref = chunks . HeadSeriesRef ( dec . Be64 ( ) )
2021-08-06 16:51:01 +00:00
// The label set written to the disk is already sorted.
2022-07-26 14:42:00 +00:00
// TODO: figure out why DecodeLabels calls Sort(), and perhaps remove it.
csr . lset = d . DecodeLabels ( & dec )
2021-08-06 16:51:01 +00:00
2022-09-27 08:22:22 +00:00
_ = dec . Be64int64 ( ) // Was chunkRange but now unused.
2021-08-06 16:51:01 +00:00
if dec . Uvarint ( ) == 0 {
return
}
csr . mc = & memChunk { }
csr . mc . minTime = dec . Be64int64 ( )
csr . mc . maxTime = dec . Be64int64 ( )
enc := chunkenc . Encoding ( dec . Byte ( ) )
// The underlying bytes gets re-used later, so make a copy.
chunkBytes := dec . UvarintBytes ( )
chunkBytesCopy := make ( [ ] byte , len ( chunkBytes ) )
copy ( chunkBytesCopy , chunkBytes )
chk , err := chunkenc . FromData ( enc , chunkBytesCopy )
if err != nil {
return csr , errors . Wrap ( err , "chunk from data" )
}
csr . mc . chunk = chk
2022-09-27 14:02:05 +00:00
// Backwards-compatibility for old sampleBuf which had last 4 samples.
for i := 0 ; i < 3 ; i ++ {
_ = dec . Be64int64 ( )
_ = dec . Be64Float64 ( )
2021-08-06 16:51:01 +00:00
}
2022-09-27 14:02:05 +00:00
_ = dec . Be64int64 ( )
csr . lastValue = dec . Be64Float64 ( )
2021-08-06 16:51:01 +00:00
err = dec . Err ( )
if err != nil && len ( dec . B ) > 0 {
err = errors . Errorf ( "unexpected %d bytes left in entry" , len ( dec . B ) )
}
return
}
func encodeTombstonesToSnapshotRecord ( tr tombstones . Reader ) ( [ ] byte , error ) {
buf := encoding . Encbuf { }
buf . PutByte ( chunkSnapshotRecordTypeTombstones )
b , err := tombstones . Encode ( tr )
if err != nil {
return nil , errors . Wrap ( err , "encode tombstones" )
}
buf . PutUvarintBytes ( b )
return buf . Get ( ) , nil
}
func decodeTombstonesSnapshotRecord ( b [ ] byte ) ( tombstones . Reader , error ) {
dec := encoding . Decbuf { B : b }
if flag := dec . Byte ( ) ; flag != chunkSnapshotRecordTypeTombstones {
return nil , errors . Errorf ( "invalid record type %x" , flag )
}
tr , err := tombstones . Decode ( dec . UvarintBytes ( ) )
return tr , errors . Wrap ( err , "decode tombstones" )
}
const chunkSnapshotPrefix = "chunk_snapshot."
// ChunkSnapshot creates a snapshot of all the series and tombstones in the head.
// It deletes the old chunk snapshots if the chunk snapshot creation is successful.
//
// The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written
// using the WAL package. N is the last WAL segment present during snapshotting and
// M is the offset in segment N upto which data was written.
2021-08-30 14:04:38 +00:00
//
// The snapshot first contains all series (each in individual records and not sorted), followed by
// tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they
// were written to the circular buffer.
2021-08-06 16:51:01 +00:00
func ( h * Head ) ChunkSnapshot ( ) ( * ChunkSnapshotStats , error ) {
if h . wal == nil {
// If we are not storing any WAL, does not make sense to take a snapshot too.
level . Warn ( h . logger ) . Log ( "msg" , "skipping chunk snapshotting as WAL is disabled" )
return & ChunkSnapshotStats { } , nil
}
h . chunkSnapshotMtx . Lock ( )
defer h . chunkSnapshotMtx . Unlock ( )
stats := & ChunkSnapshotStats { }
wlast , woffset , err := h . wal . LastSegmentAndOffset ( )
if err != nil && err != record . ErrNotFound {
return stats , errors . Wrap ( err , "get last wal segment and offset" )
}
_ , cslast , csoffset , err := LastChunkSnapshot ( h . opts . ChunkDirRoot )
if err != nil && err != record . ErrNotFound {
return stats , errors . Wrap ( err , "find last chunk snapshot" )
}
if wlast == cslast && woffset == csoffset {
// Nothing has been written to the WAL/Head since the last snapshot.
return stats , nil
}
2021-09-08 14:23:44 +00:00
snapshotName := chunkSnapshotDir ( wlast , woffset )
2021-08-06 16:51:01 +00:00
cpdir := filepath . Join ( h . opts . ChunkDirRoot , snapshotName )
cpdirtmp := cpdir + ".tmp"
stats . Dir = cpdir
2021-10-22 08:06:44 +00:00
if err := os . MkdirAll ( cpdirtmp , 0 o777 ) ; err != nil {
2021-08-06 16:51:01 +00:00
return stats , errors . Wrap ( err , "create chunk snapshot dir" )
}
cp , err := wal . New ( nil , nil , cpdirtmp , h . wal . CompressionEnabled ( ) )
if err != nil {
return stats , errors . Wrap ( err , "open chunk snapshot" )
}
// Ensures that an early return caused by an error doesn't leave any tmp files.
defer func ( ) {
cp . Close ( )
os . RemoveAll ( cpdirtmp )
} ( )
var (
buf [ ] byte
recs [ ] [ ] byte
)
2021-08-30 14:04:38 +00:00
// Add all series to the snapshot.
2021-08-06 16:51:01 +00:00
stripeSize := h . series . size
for i := 0 ; i < stripeSize ; i ++ {
h . series . locks [ i ] . RLock ( )
for _ , s := range h . series . series [ i ] {
start := len ( buf )
buf = s . encodeToSnapshotRecord ( buf )
if len ( buf [ start : ] ) == 0 {
continue // All contents discarded.
}
recs = append ( recs , buf [ start : ] )
// Flush records in 10 MB increments.
if len ( buf ) > 10 * 1024 * 1024 {
if err := cp . Log ( recs ... ) ; err != nil {
h . series . locks [ i ] . RUnlock ( )
return stats , errors . Wrap ( err , "flush records" )
}
buf , recs = buf [ : 0 ] , recs [ : 0 ]
}
}
stats . TotalSeries += len ( h . series . series [ i ] )
h . series . locks [ i ] . RUnlock ( )
}
// Add tombstones to the snapshot.
tombstonesReader , err := h . Tombstones ( )
if err != nil {
return stats , errors . Wrap ( err , "get tombstones" )
}
rec , err := encodeTombstonesToSnapshotRecord ( tombstonesReader )
if err != nil {
return stats , errors . Wrap ( err , "encode tombstones" )
}
recs = append ( recs , rec )
2021-08-30 14:04:38 +00:00
// Flush remaining series records and tombstones.
2021-08-06 16:51:01 +00:00
if err := cp . Log ( recs ... ) ; err != nil {
return stats , errors . Wrap ( err , "flush records" )
}
2021-08-30 14:04:38 +00:00
buf = buf [ : 0 ]
// Add exemplars in the snapshot.
// We log in batches, with each record having upto 10000 exemplars.
// Assuming 100 bytes (overestimate) per exemplar, that's ~1MB.
maxExemplarsPerRecord := 10000
batch := make ( [ ] record . RefExemplar , 0 , maxExemplarsPerRecord )
enc := record . Encoder { }
flushExemplars := func ( ) error {
if len ( batch ) == 0 {
return nil
}
buf = buf [ : 0 ]
encbuf := encoding . Encbuf { B : buf }
encbuf . PutByte ( chunkSnapshotRecordTypeExemplars )
enc . EncodeExemplarsIntoBuffer ( batch , & encbuf )
if err := cp . Log ( encbuf . Get ( ) ) ; err != nil {
return errors . Wrap ( err , "log exemplars" )
}
buf , batch = buf [ : 0 ] , batch [ : 0 ]
return nil
}
err = h . exemplars . IterateExemplars ( func ( seriesLabels labels . Labels , e exemplar . Exemplar ) error {
if len ( batch ) >= maxExemplarsPerRecord {
if err := flushExemplars ( ) ; err != nil {
return errors . Wrap ( err , "flush exemplars" )
}
}
ms := h . series . getByHash ( seriesLabels . Hash ( ) , seriesLabels )
if ms == nil {
// It is possible that exemplar refers to some old series. We discard such exemplars.
return nil
}
batch = append ( batch , record . RefExemplar {
Ref : ms . ref ,
T : e . Ts ,
V : e . Value ,
Labels : e . Labels ,
} )
return nil
} )
if err != nil {
return stats , errors . Wrap ( err , "iterate exemplars" )
}
// Flush remaining exemplars.
if err := flushExemplars ( ) ; err != nil {
return stats , errors . Wrap ( err , "flush exemplars at the end" )
}
2021-08-06 16:51:01 +00:00
if err := cp . Close ( ) ; err != nil {
return stats , errors . Wrap ( err , "close chunk snapshot" )
}
if err := fileutil . Replace ( cpdirtmp , cpdir ) ; err != nil {
return stats , errors . Wrap ( err , "rename chunk snapshot directory" )
}
2021-09-08 14:23:44 +00:00
if err := DeleteChunkSnapshots ( h . opts . ChunkDirRoot , wlast , woffset ) ; err != nil {
2021-08-06 16:51:01 +00:00
// Leftover old chunk snapshots do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher chunk snapshot exists.
level . Error ( h . logger ) . Log ( "msg" , "delete old chunk snapshots" , "err" , err )
}
return stats , nil
}
2021-09-08 14:23:44 +00:00
func chunkSnapshotDir ( wlast , woffset int ) string {
return fmt . Sprintf ( chunkSnapshotPrefix + "%06d.%010d" , wlast , woffset )
}
2021-08-06 16:51:01 +00:00
func ( h * Head ) performChunkSnapshot ( ) error {
level . Info ( h . logger ) . Log ( "msg" , "creating chunk snapshot" )
startTime := time . Now ( )
stats , err := h . ChunkSnapshot ( )
elapsed := time . Since ( startTime )
if err == nil {
level . Info ( h . logger ) . Log ( "msg" , "chunk snapshot complete" , "duration" , elapsed . String ( ) , "num_series" , stats . TotalSeries , "dir" , stats . Dir )
}
return errors . Wrap ( err , "chunk snapshot" )
}
// ChunkSnapshotStats returns stats about a created chunk snapshot.
type ChunkSnapshotStats struct {
TotalSeries int
Dir string
}
// LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot.
// If dir does not contain any chunk snapshots, ErrNotFound is returned.
func LastChunkSnapshot ( dir string ) ( string , int , int , error ) {
2022-04-27 09:24:36 +00:00
files , err := os . ReadDir ( dir )
2021-08-06 16:51:01 +00:00
if err != nil {
return "" , 0 , 0 , err
}
2021-09-08 14:23:44 +00:00
maxIdx , maxOffset := - 1 , - 1
maxFileName := ""
for i := 0 ; i < len ( files ) ; i ++ {
2021-08-06 16:51:01 +00:00
fi := files [ i ]
if ! strings . HasPrefix ( fi . Name ( ) , chunkSnapshotPrefix ) {
continue
}
if ! fi . IsDir ( ) {
return "" , 0 , 0 , errors . Errorf ( "chunk snapshot %s is not a directory" , fi . Name ( ) )
}
splits := strings . Split ( fi . Name ( ) [ len ( chunkSnapshotPrefix ) : ] , "." )
if len ( splits ) != 2 {
2021-12-08 15:32:14 +00:00
// Chunk snapshots is not in the right format, we do not care about it.
continue
2021-08-06 16:51:01 +00:00
}
idx , err := strconv . Atoi ( splits [ 0 ] )
if err != nil {
continue
}
offset , err := strconv . Atoi ( splits [ 1 ] )
if err != nil {
continue
}
2021-09-08 14:23:44 +00:00
if idx > maxIdx || ( idx == maxIdx && offset > maxOffset ) {
maxIdx , maxOffset = idx , offset
maxFileName = filepath . Join ( dir , fi . Name ( ) )
}
}
if maxFileName == "" {
return "" , 0 , 0 , record . ErrNotFound
2021-08-06 16:51:01 +00:00
}
2021-09-08 14:23:44 +00:00
return maxFileName , maxIdx , maxOffset , nil
2021-08-06 16:51:01 +00:00
}
// DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.
func DeleteChunkSnapshots ( dir string , maxIndex , maxOffset int ) error {
2022-04-27 09:24:36 +00:00
files , err := os . ReadDir ( dir )
2021-08-06 16:51:01 +00:00
if err != nil {
return err
}
errs := tsdb_errors . NewMulti ( )
for _ , fi := range files {
if ! strings . HasPrefix ( fi . Name ( ) , chunkSnapshotPrefix ) {
continue
}
splits := strings . Split ( fi . Name ( ) [ len ( chunkSnapshotPrefix ) : ] , "." )
if len ( splits ) != 2 {
continue
}
idx , err := strconv . Atoi ( splits [ 0 ] )
if err != nil {
continue
}
offset , err := strconv . Atoi ( splits [ 1 ] )
if err != nil {
continue
}
2021-09-08 14:23:44 +00:00
if idx < maxIndex || ( idx == maxIndex && offset < maxOffset ) {
2021-08-06 16:51:01 +00:00
if err := os . RemoveAll ( filepath . Join ( dir , fi . Name ( ) ) ) ; err != nil {
errs . Add ( err )
}
}
}
return errs . Err ( )
}
2021-08-17 17:08:16 +00:00
// loadChunkSnapshot replays the chunk snapshot and restores the Head state from it. If there was any error returned,
// it is the responsibility of the caller to clear the contents of the Head.
2021-11-06 10:10:04 +00:00
func ( h * Head ) loadChunkSnapshot ( ) ( int , int , map [ chunks . HeadSeriesRef ] * memSeries , error ) {
2021-08-06 16:51:01 +00:00
dir , snapIdx , snapOffset , err := LastChunkSnapshot ( h . opts . ChunkDirRoot )
if err != nil {
if err == record . ErrNotFound {
return snapIdx , snapOffset , nil , nil
}
return snapIdx , snapOffset , nil , errors . Wrap ( err , "find last chunk snapshot" )
}
start := time . Now ( )
sr , err := wal . NewSegmentsReader ( dir )
if err != nil {
return snapIdx , snapOffset , nil , errors . Wrap ( err , "open chunk snapshot" )
}
defer func ( ) {
if err := sr . Close ( ) ; err != nil {
level . Warn ( h . logger ) . Log ( "msg" , "error while closing the wal segments reader" , "err" , err )
}
} ( )
var (
numSeries = 0
unknownRefs = int64 ( 0 )
n = runtime . GOMAXPROCS ( 0 )
wg sync . WaitGroup
recordChan = make ( chan chunkSnapshotRecord , 5 * n )
2021-11-06 10:10:04 +00:00
shardedRefSeries = make ( [ ] map [ chunks . HeadSeriesRef ] * memSeries , n )
2021-08-06 16:51:01 +00:00
errChan = make ( chan error , n )
2021-11-06 10:10:04 +00:00
refSeries map [ chunks . HeadSeriesRef ] * memSeries
2021-08-30 14:04:38 +00:00
exemplarBuf [ ] record . RefExemplar
dec record . Decoder
2021-08-06 16:51:01 +00:00
)
wg . Add ( n )
for i := 0 ; i < n ; i ++ {
go func ( idx int , rc <- chan chunkSnapshotRecord ) {
defer wg . Done ( )
defer func ( ) {
// If there was an error, drain the channel
// to unblock the main thread.
for range rc {
}
} ( )
2021-11-06 10:10:04 +00:00
shardedRefSeries [ idx ] = make ( map [ chunks . HeadSeriesRef ] * memSeries )
2021-08-06 16:51:01 +00:00
localRefSeries := shardedRefSeries [ idx ]
for csr := range rc {
series , _ , err := h . getOrCreateWithID ( csr . ref , csr . lset . Hash ( ) , csr . lset )
if err != nil {
errChan <- err
return
}
localRefSeries [ csr . ref ] = series
2022-08-04 08:09:14 +00:00
for {
seriesID := uint64 ( series . ref )
lastSeriesID := h . lastSeriesID . Load ( )
2022-09-09 11:28:55 +00:00
if lastSeriesID >= seriesID || h . lastSeriesID . CompareAndSwap ( lastSeriesID , seriesID ) {
2022-08-04 08:09:14 +00:00
break
}
2021-08-06 16:51:01 +00:00
}
if csr . mc == nil {
continue
}
series . nextAt = csr . mc . maxTime // This will create a new chunk on append.
series . headChunk = csr . mc
2022-09-27 14:02:05 +00:00
series . lastValue = csr . lastValue
2021-08-06 16:51:01 +00:00
app , err := series . headChunk . chunk . Appender ( )
if err != nil {
errChan <- err
return
}
series . app = app
h . updateMinMaxTime ( csr . mc . minTime , csr . mc . maxTime )
}
} ( i , recordChan )
}
r := wal . NewReader ( sr )
var loopErr error
Outer :
for r . Next ( ) {
select {
case err := <- errChan :
errChan <- err
break Outer
default :
}
rec := r . Record ( )
switch rec [ 0 ] {
case chunkSnapshotRecordTypeSeries :
numSeries ++
2022-07-26 14:42:00 +00:00
csr , err := decodeSeriesFromChunkSnapshot ( & dec , rec )
2021-08-06 16:51:01 +00:00
if err != nil {
loopErr = errors . Wrap ( err , "decode series record" )
break Outer
}
recordChan <- csr
case chunkSnapshotRecordTypeTombstones :
tr , err := decodeTombstonesSnapshotRecord ( rec )
if err != nil {
loopErr = errors . Wrap ( err , "decode tombstones" )
break Outer
}
2021-11-06 10:10:04 +00:00
if err = tr . Iter ( func ( ref storage . SeriesRef , ivs tombstones . Intervals ) error {
2021-08-06 16:51:01 +00:00
h . tombstones . AddInterval ( ref , ivs ... )
return nil
} ) ; err != nil {
loopErr = errors . Wrap ( err , "iterate tombstones" )
break Outer
}
2021-08-30 14:04:38 +00:00
case chunkSnapshotRecordTypeExemplars :
// Exemplars are at the end of snapshot. So all series are loaded at this point.
if len ( refSeries ) == 0 {
close ( recordChan )
wg . Wait ( )
2021-11-06 10:10:04 +00:00
refSeries = make ( map [ chunks . HeadSeriesRef ] * memSeries , numSeries )
2021-08-30 14:04:38 +00:00
for _ , shard := range shardedRefSeries {
for k , v := range shard {
refSeries [ k ] = v
}
}
}
2021-10-05 05:21:25 +00:00
if ! h . opts . EnableExemplarStorage || h . opts . MaxExemplars . Load ( ) <= 0 {
// Exemplar storage is disabled.
continue Outer
}
2021-08-30 14:04:38 +00:00
decbuf := encoding . Decbuf { B : rec [ 1 : ] }
exemplarBuf = exemplarBuf [ : 0 ]
exemplarBuf , err = dec . ExemplarsFromBuffer ( & decbuf , exemplarBuf )
if err != nil {
loopErr = errors . Wrap ( err , "exemplars from buffer" )
break Outer
}
for _ , e := range exemplarBuf {
ms , ok := refSeries [ e . Ref ]
if ! ok {
unknownRefs ++
continue
}
if err := h . exemplars . AddExemplar ( ms . lset , exemplar . Exemplar {
Labels : e . Labels ,
Value : e . V ,
Ts : e . T ,
} ) ; err != nil {
2021-10-05 05:21:25 +00:00
loopErr = errors . Wrap ( err , "add exemplar" )
2021-08-30 14:04:38 +00:00
break Outer
}
}
2021-08-17 17:08:16 +00:00
default :
// This is a record type we don't understand. It is either and old format from earlier versions,
// or a new format and the code was rolled back to old version.
loopErr = errors . Errorf ( "unsuported snapshot record type 0b%b" , rec [ 0 ] )
2021-08-30 14:04:38 +00:00
break Outer
2021-08-06 16:51:01 +00:00
}
}
2021-08-30 14:04:38 +00:00
if len ( refSeries ) == 0 {
close ( recordChan )
wg . Wait ( )
}
2021-08-06 16:51:01 +00:00
close ( errChan )
merr := tsdb_errors . NewMulti ( errors . Wrap ( loopErr , "decode loop" ) )
for err := range errChan {
merr . Add ( errors . Wrap ( err , "record processing" ) )
}
if err := merr . Err ( ) ; err != nil {
return - 1 , - 1 , nil , err
}
2021-08-17 17:08:16 +00:00
if r . Err ( ) != nil {
return - 1 , - 1 , nil , errors . Wrap ( r . Err ( ) , "read records" )
}
2021-08-30 14:04:38 +00:00
if len ( refSeries ) == 0 {
// We had no exemplar record, so we have to build the map here.
2021-11-06 10:10:04 +00:00
refSeries = make ( map [ chunks . HeadSeriesRef ] * memSeries , numSeries )
2021-08-30 14:04:38 +00:00
for _ , shard := range shardedRefSeries {
for k , v := range shard {
refSeries [ k ] = v
}
2021-08-06 16:51:01 +00:00
}
}
elapsed := time . Since ( start )
level . Info ( h . logger ) . Log ( "msg" , "chunk snapshot loaded" , "dir" , dir , "num_series" , numSeries , "duration" , elapsed . String ( ) )
if unknownRefs > 0 {
level . Warn ( h . logger ) . Log ( "msg" , "unknown series references during chunk snapshot replay" , "count" , unknownRefs )
}
return snapIdx , snapOffset , refSeries , nil
}