diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 4a550a5fd..60188a7bd 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4095,8 +4095,7 @@ func TestOOOCompaction(t *testing.T) { ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) require.NoError(t, err) require.False(t, created) - require.Nil(t, ms.oooHeadChunk) - require.Equal(t, 0, len(ms.oooMmappedChunks)) + require.Nil(t, ms.ooo) } checkEmptyOOOChunk(series1) checkEmptyOOOChunk(series2) @@ -4138,8 +4137,8 @@ func TestOOOCompaction(t *testing.T) { ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) require.NoError(t, err) require.False(t, created) - require.Greater(t, ms.oooHeadChunk.chunk.NumSamples(), 0) - require.Equal(t, 14, len(ms.oooMmappedChunks)) // 7 original, 7 duplicate. + require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0) + require.Equal(t, 14, len(ms.ooo.oooMmappedChunks)) // 7 original, 7 duplicate. } checkNonEmptyOOOChunk(series1) checkNonEmptyOOOChunk(series2) @@ -4278,7 +4277,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) require.NoError(t, err) require.False(t, created) - require.Greater(t, ms.oooHeadChunk.chunk.NumSamples(), 0) + require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0) } // If the normal Head is not compacted, the OOO head compaction does not take place. @@ -4306,8 +4305,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) { ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) require.NoError(t, err) require.False(t, created) - require.Nil(t, ms.oooHeadChunk) - require.Equal(t, 0, len(ms.oooMmappedChunks)) + require.Nil(t, ms.ooo) } verifySamples := func(block *Block, fromMins, toMins int64) { @@ -4700,8 +4698,7 @@ func TestOOODisabled(t *testing.T) { require.NoError(t, err) require.False(t, created) require.NotNil(t, ms) - require.Nil(t, ms.oooHeadChunk) - require.Len(t, ms.oooMmappedChunks, 0) + require.Nil(t, ms.ooo) } func TestWBLAndMmapReplay(t *testing.T) { @@ -4765,7 +4762,7 @@ func TestWBLAndMmapReplay(t *testing.T) { require.False(t, created) require.NoError(t, err) var s1MmapSamples []tsdbutil.Sample - for _, mc := range ms.oooMmappedChunks { + for _, mc := range ms.ooo.oooMmappedChunks { chk, err := db.head.chunkDiskMapper.Chunk(mc.ref) require.NoError(t, err) it := chk.Iterator(nil) @@ -4972,8 +4969,7 @@ func TestOOOCompactionFailure(t *testing.T) { ms, created, err := db.head.getOrCreate(series1.Hash(), series1) require.NoError(t, err) require.False(t, created) - require.Nil(t, ms.oooHeadChunk) - require.Len(t, ms.oooMmappedChunks, 0) + require.Nil(t, ms.ooo) // The failed compaction should not have left the ooo Head corrupted. // Hence, expect no new blocks with another OOO compaction call. @@ -5778,7 +5774,7 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { db.DisableCompactions() ms := db.head.series.getByHash(series1.Hash(), series1) - require.Greater(t, len(ms.oooMmappedChunks), 0, "OOO mmap chunk was not replayed") + require.Greater(t, len(ms.ooo.oooMmappedChunks), 0, "OOO mmap chunk was not replayed") checkMmapFileContents := func(contains, notContains []string) { mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot) @@ -5806,7 +5802,7 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) { checkMmapFileContents([]string{"000001", "000002"}, nil) require.NoError(t, db.Compact()) checkMmapFileContents([]string{"000002"}, []string{"000001"}) - require.Equal(t, 0, len(ms.oooMmappedChunks), "OOO mmap chunk was not compacted") + require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted") addSamples(501, 650) checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"}) diff --git a/tsdb/head.go b/tsdb/head.go index e951e9ece..6432cd891 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -763,7 +763,11 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) h.metrics.chunks.Inc() h.metrics.chunksCreated.Inc() - ms.oooMmappedChunks = append(ms.oooMmappedChunks, &mmappedChunk{ + if ms.ooo == nil { + ms.ooo = &memSeriesOOOFields{} + } + + ms.ooo.oooMmappedChunks = append(ms.ooo.oooMmappedChunks, &mmappedChunk{ ref: chunkRef, minTime: mint, maxTime: maxt, @@ -1666,24 +1670,24 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) ( minMmapFile = seq } } - if len(series.oooMmappedChunks) > 0 { - seq, _ := series.oooMmappedChunks[0].ref.Unpack() + if series.ooo != nil && len(series.ooo.oooMmappedChunks) > 0 { + seq, _ := series.ooo.oooMmappedChunks[0].ref.Unpack() if seq < minMmapFile { minMmapFile = seq } - for _, ch := range series.oooMmappedChunks { + for _, ch := range series.ooo.oooMmappedChunks { if ch.minTime < minOOOTime { minOOOTime = ch.minTime } } } - if series.oooHeadChunk != nil { - if series.oooHeadChunk.minTime < minOOOTime { - minOOOTime = series.oooHeadChunk.minTime + if series.ooo != nil && series.ooo.oooHeadChunk != nil { + if series.ooo.oooHeadChunk.minTime < minOOOTime { + minOOOTime = series.ooo.oooHeadChunk.minTime } } - if len(series.mmappedChunks) > 0 || len(series.oooMmappedChunks) > 0 || - series.headChunk != nil || series.oooHeadChunk != nil || series.pendingCommit { + if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit || + (series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) { seriesMint := series.minTime() if seriesMint < actualMint { actualMint = seriesMint @@ -1840,9 +1844,7 @@ type memSeries struct { headChunk *memChunk // Most recent chunk in memory that's still being built. firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0] - oooMmappedChunks []*mmappedChunk // Immutable chunks on disk containing OOO samples. - oooHeadChunk *oooHeadChunk // Most recent chunk for ooo samples in memory that's still being built. - firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0] + ooo *memSeriesOOOFields mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. @@ -1866,6 +1868,14 @@ type memSeries struct { pendingCommit bool // Whether there are samples waiting to be committed to this series. } +// memSeriesOOOFields contains the fields required by memSeries +// to handle out-of-order data. +type memSeriesOOOFields struct { + oooMmappedChunks []*mmappedChunk // Immutable chunks on disk containing OOO samples. + oooHeadChunk *oooHeadChunk // Most recent chunk for ooo samples in memory that's still being built. + firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. +} + func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries { s := &memSeries{ lset: lset, @@ -1924,15 +1934,19 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD } var removedOOO int - if len(s.oooMmappedChunks) > 0 { - for i, c := range s.oooMmappedChunks { + if s.ooo != nil && len(s.ooo.oooMmappedChunks) > 0 { + for i, c := range s.ooo.oooMmappedChunks { if c.ref.GreaterThan(minOOOMmapRef) { break } removedOOO = i + 1 } - s.oooMmappedChunks = append(s.oooMmappedChunks[:0], s.oooMmappedChunks[removedOOO:]...) - s.firstOOOChunkID += chunks.HeadChunkID(removedOOO) + s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks[:0], s.ooo.oooMmappedChunks[removedOOO:]...) + s.ooo.firstOOOChunkID += chunks.HeadChunkID(removedOOO) + + if len(s.ooo.oooMmappedChunks) == 0 && s.ooo.oooHeadChunk == nil { + s.ooo = nil + } } return removedInOrder + removedOOO diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 46235ce93..c62f4ffec 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1093,7 +1093,10 @@ func (a *headAppender) Commit() (err error) { // insert is like append, except it inserts. Used for OOO samples. func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) { - c := s.oooHeadChunk + if s.ooo == nil { + s.ooo = &memSeriesOOOFields{} + } + c := s.ooo.oooHeadChunk if c == nil || c.chunk.NumSamples() == int(oooCapMax) { // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper) @@ -1412,33 +1415,35 @@ func (s *memSeries) cutNewHeadChunk( return s.headChunk } +// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk. +// The caller must ensure that s.ooo is not nil. func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, chunks.ChunkDiskMapperRef) { ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper) - s.oooHeadChunk = &oooHeadChunk{ + s.ooo.oooHeadChunk = &oooHeadChunk{ chunk: NewOOOChunk(), minTime: mint, maxTime: math.MinInt64, } - return s.oooHeadChunk, ref + return s.ooo.oooHeadChunk, ref } func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) chunks.ChunkDiskMapperRef { - if s.oooHeadChunk == nil { + if s.ooo == nil || s.ooo.oooHeadChunk == nil { // There is no head chunk, so nothing to m-map here. return 0 } - xor, _ := s.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality. + xor, _ := s.ooo.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality. oooXor := &chunkenc.OOOXORChunk{XORChunk: xor} - chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.oooHeadChunk.minTime, s.oooHeadChunk.maxTime, oooXor, handleChunkWriteError) - s.oooMmappedChunks = append(s.oooMmappedChunks, &mmappedChunk{ + chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, oooXor, handleChunkWriteError) + s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ ref: chunkRef, numSamples: uint16(xor.NumSamples()), - minTime: s.oooHeadChunk.minTime, - maxTime: s.oooHeadChunk.maxTime, + minTime: s.ooo.oooHeadChunk.minTime, + maxTime: s.ooo.oooHeadChunk.maxTime, }) - s.oooHeadChunk = nil + s.ooo.oooHeadChunk = nil return chunkRef } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 4efa2e44b..5d9d980b2 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -196,8 +196,9 @@ func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID { // oooHeadChunkID returns the HeadChunkID referred to by the given position. // * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos] // * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk +// The caller must ensure that s.ooo is not nil. func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID { - return chunks.HeadChunkID(pos) + s.firstOOOChunkID + return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID } // LabelValueFor returns label value for the given label name in the series referred to by ID. @@ -349,6 +350,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi // might be a merge of all the overlapping chunks, if any, amongst all the // chunks in the OOOHead. // This function is not thread safe unless the caller holds a lock. +// The caller must ensure that s.ooo is not nil. func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (chunk *mergedOOOChunks, err error) { _, cid := chunks.HeadChunkRef(meta.Ref).Unpack() @@ -356,23 +358,23 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper // incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix // is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. - ix := int(cid) - int(s.firstOOOChunkID) - if ix < 0 || ix > len(s.oooMmappedChunks) { + ix := int(cid) - int(s.ooo.firstOOOChunkID) + if ix < 0 || ix > len(s.ooo.oooMmappedChunks) { return nil, storage.ErrNotFound } - if ix == len(s.oooMmappedChunks) { - if s.oooHeadChunk == nil { + if ix == len(s.ooo.oooMmappedChunks) { + if s.ooo.oooHeadChunk == nil { return nil, errors.New("invalid ooo head chunk") } } // We create a temporary slice of chunk metas to hold the information of all // possible chunks that may overlap with the requested chunk. - tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.oooMmappedChunks)) + tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks)) - oooHeadRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.oooMmappedChunks)))) - if s.oooHeadChunk != nil && s.oooHeadChunk.OverlapsClosedInterval(mint, maxt) { + oooHeadRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) + if s.ooo.oooHeadChunk != nil && s.ooo.oooHeadChunk.OverlapsClosedInterval(mint, maxt) { // We only want to append the head chunk if this chunk existed when // Series() was called. This brings consistency in case new data // is added in between Series() and Chunk() calls. @@ -388,7 +390,7 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper } } - for i, c := range s.oooMmappedChunks { + for i, c := range s.ooo.oooMmappedChunks { chunkRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) // We can skip chunks that came in later than the last known OOOLastRef. if chunkRef > meta.OOOLastRef { @@ -433,11 +435,11 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper // If head chunk min and max time match the meta OOO markers // that means that the chunk has not expanded so we can append // it as it is. - if s.oooHeadChunk.minTime == meta.OOOLastMinTime && s.oooHeadChunk.maxTime == meta.OOOLastMaxTime { - xor, err = s.oooHeadChunk.chunk.ToXOR() // TODO(jesus.vazquez) (This is an optimization idea that has no priority and might not be that useful) See if we could use a copy of the underlying slice. That would leave the more expensive ToXOR() function only for the usecase where Bytes() is called. + if s.ooo.oooHeadChunk.minTime == meta.OOOLastMinTime && s.ooo.oooHeadChunk.maxTime == meta.OOOLastMaxTime { + xor, err = s.ooo.oooHeadChunk.chunk.ToXOR() // TODO(jesus.vazquez) (This is an optimization idea that has no priority and might not be that useful) See if we could use a copy of the underlying slice. That would leave the more expensive ToXOR() function only for the usecase where Bytes() is called. } else { // We need to remove samples that are outside of the markers - xor, err = s.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime) + xor, err = s.ooo.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime) } if err != nil { return nil, errors.Wrap(err, "failed to convert ooo head chunk to xor chunk") diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 5e92cd4b8..2a1123db8 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -4008,7 +4008,7 @@ func TestOOOWalReplay(t *testing.T) { require.False(t, ok) require.NotNil(t, ms) - xor, err := ms.oooHeadChunk.chunk.ToXOR() + xor, err := ms.ooo.oooHeadChunk.chunk.ToXOR() require.NoError(t, err) it := xor.Iterator(nil) @@ -4068,16 +4068,16 @@ func TestOOOMmapReplay(t *testing.T) { require.False(t, ok) require.NotNil(t, ms) - require.Len(t, ms.oooMmappedChunks, 3) + require.Len(t, ms.ooo.oooMmappedChunks, 3) // Verify that we can access the chunks without error. - for _, m := range ms.oooMmappedChunks { + for _, m := range ms.ooo.oooMmappedChunks { chk, err := h.chunkDiskMapper.Chunk(m.ref) require.NoError(t, err) require.Equal(t, int(m.numSamples), chk.NumSamples()) } expMmapChunks := make([]*mmappedChunk, 3) - copy(expMmapChunks, ms.oooMmappedChunks) + copy(expMmapChunks, ms.ooo.oooMmappedChunks) // Restart head. require.NoError(t, h.Close()) @@ -4096,16 +4096,16 @@ func TestOOOMmapReplay(t *testing.T) { require.False(t, ok) require.NotNil(t, ms) - require.Len(t, ms.oooMmappedChunks, len(expMmapChunks)) + require.Len(t, ms.ooo.oooMmappedChunks, len(expMmapChunks)) // Verify that we can access the chunks without error. - for _, m := range ms.oooMmappedChunks { + for _, m := range ms.ooo.oooMmappedChunks { chk, err := h.chunkDiskMapper.Chunk(m.ref) require.NoError(t, err) require.Equal(t, int(m.numSamples), chk.NumSamples()) } actMmapChunks := make([]*mmappedChunk, len(expMmapChunks)) - copy(actMmapChunks, ms.oooMmappedChunks) + copy(actMmapChunks, ms.ooo.oooMmappedChunks) require.Equal(t, expMmapChunks, actMmapChunks) @@ -4500,8 +4500,8 @@ func TestOOOAppendWithNoSeries(t *testing.T) { require.NotNil(t, ms) require.Nil(t, ms.headChunk) - require.NotNil(t, ms.oooHeadChunk) - require.Equal(t, expSamples, ms.oooHeadChunk.chunk.NumSamples()) + require.NotNil(t, ms.ooo.oooHeadChunk) + require.Equal(t, expSamples, ms.ooo.oooHeadChunk.chunk.NumSamples()) } verifyInOrderSamples := func(lbls labels.Labels, expSamples int) { @@ -4510,7 +4510,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) { require.False(t, created) require.NotNil(t, ms) - require.Nil(t, ms.oooHeadChunk) + require.Nil(t, ms.ooo) require.NotNil(t, ms.headChunk) require.Equal(t, expSamples, ms.headChunk.chunk.NumSamples()) } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 97f2dcc14..9bc6985d3 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -499,7 +499,15 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) h.metrics.chunks.Add(float64(len(mmc) + len(oooMmc) - len(mSeries.mmappedChunks))) mSeries.mmappedChunks = mmc - mSeries.oooMmappedChunks = oooMmc + mSeries.ooo = nil + if len(oooMmc) == 0 { + mSeries.ooo = nil + } else { + if mSeries.ooo == nil { + mSeries.ooo = &memSeriesOOOFields{} + } + *mSeries.ooo = memSeriesOOOFields{oooMmappedChunks: oooMmc} + } // Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject. if len(mmc) == 0 { mSeries.mmMaxTime = math.MinInt64 @@ -816,7 +824,9 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks. // chunk size parameters, we are not taking care of that here. // TODO(codesome): see if there is a way to avoid duplicate m-map chunks if // the size of ooo chunk was reduced between restart. - ms.oooHeadChunk = nil + if ms.ooo != nil { + ms.ooo.oooHeadChunk = nil + } processors[idx].mx.Unlock() } diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 3486083f2..9feb6bc6f 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -71,7 +71,11 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra defer s.Unlock() *chks = (*chks)[:0] - tmpChks := make([]chunks.Meta, 0, len(s.oooMmappedChunks)) + if s.ooo == nil { + return nil + } + + tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) // We define these markers to track the last chunk reference while we // fill the chunk meta. @@ -103,15 +107,15 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra // Collect all chunks that overlap the query range, in order from most recent to most old, // so we can set the correct markers. - if s.oooHeadChunk != nil { - c := s.oooHeadChunk + if s.ooo.oooHeadChunk != nil { + c := s.ooo.oooHeadChunk if c.OverlapsClosedInterval(oh.mint, oh.maxt) && lastMmapRef == 0 { - ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.oooMmappedChunks)))) + ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) addChunk(c.minTime, c.maxTime, ref) } } - for i := len(s.oooMmappedChunks) - 1; i >= 0; i-- { - c := s.oooMmappedChunks[i] + for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- { + c := s.ooo.oooMmappedChunks[i] if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (lastMmapRef == 0 || lastMmapRef.GreaterThanOrEqualTo(c.ref)) { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) addChunk(c.minTime, c.maxTime, ref) @@ -232,6 +236,11 @@ func (cr OOOHeadChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { } s.Lock() + if s.ooo == nil { + // There is no OOO data for this series. + s.Unlock() + return nil, storage.ErrNotFound + } c, err := s.oooMergedChunk(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt) s.Unlock() if err != nil { @@ -302,18 +311,23 @@ func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) { // TODO: consider having a lock specifically for ooo data. ms.Lock() + if ms.ooo == nil { + ms.Unlock() + continue + } + mmapRef := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) - if mmapRef == 0 && len(ms.oooMmappedChunks) > 0 { + if mmapRef == 0 && len(ms.ooo.oooMmappedChunks) > 0 { // Nothing was m-mapped. So take the mmapRef from the existing slice if it exists. - mmapRef = ms.oooMmappedChunks[len(ms.oooMmappedChunks)-1].ref + mmapRef = ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref } seq, off := mmapRef.Unpack() if seq > lastSeq || (seq == lastSeq && off > lastOff) { ch.lastMmapRef, lastSeq, lastOff = mmapRef, seq, off } - if len(ms.oooMmappedChunks) > 0 { + if len(ms.ooo.oooMmappedChunks) > 0 { ch.postings = append(ch.postings, seriesRef) - for _, c := range ms.oooMmappedChunks { + for _, c := range ms.ooo.oooMmappedChunks { if c.minTime < ch.mint { ch.mint = c.minTime } diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 4448443c4..177bd2326 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -301,6 +301,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { require.NoError(t, h.Init(0)) s1, _, _ := h.getOrCreate(s1ID, s1Lset) + s1.ooo = &memSeriesOOOFields{} var lastChunk chunkInterval var lastChunkPos int @@ -340,7 +341,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { if headChunk && len(intervals) > 0 { // Put the last interval in the head chunk - s1.oooHeadChunk = &oooHeadChunk{ + s1.ooo.oooHeadChunk = &oooHeadChunk{ minTime: intervals[len(intervals)-1].mint, maxTime: intervals[len(intervals)-1].maxt, } @@ -348,7 +349,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { } for _, ic := range intervals { - s1.oooMmappedChunks = append(s1.oooMmappedChunks, &mmappedChunk{ + s1.ooo.oooMmappedChunks = append(s1.ooo.oooMmappedChunks, &mmappedChunk{ minTime: ic.mint, maxTime: ic.maxt, })