diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 62c3727e2..b92fa2e7f 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -846,16 +846,17 @@ func (a *headAppender) Commit() (err error) { // number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled) floatOOBRejected int - inOrderMint int64 = math.MaxInt64 - inOrderMaxt int64 = math.MinInt64 - ooomint int64 = math.MaxInt64 - ooomaxt int64 = math.MinInt64 - wblSamples []record.RefSample - oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef - oooRecords [][]byte - oooCapMax = a.head.opts.OutOfOrderCapMax.Load() - series *memSeries - appendChunkOpts = chunkOpts{ + inOrderMint int64 = math.MaxInt64 + inOrderMaxt int64 = math.MinInt64 + ooomint int64 = math.MaxInt64 + ooomaxt int64 = math.MinInt64 + wblSamples []record.RefSample + oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef + oooMmapMarkersCount int + oooRecords [][]byte + oooCapMax = a.head.opts.OutOfOrderCapMax.Load() + series *memSeries + appendChunkOpts = chunkOpts{ chunkDiskMapper: a.head.chunkDiskMapper, chunkRange: a.head.chunkRange.Load(), samplesPerChunk: a.head.opts.SamplesPerChunk, @@ -872,6 +873,7 @@ func (a *headAppender) Commit() (err error) { // WBL is not enabled. So no need to collect. wblSamples = nil oooMmapMarkers = nil + oooMmapMarkersCount = 0 return } // The m-map happens before adding a new sample. So we collect @@ -880,12 +882,14 @@ func (a *headAppender) Commit() (err error) { // WBL Before this Commit(): [old samples before this commit for chunk 1] // WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3] if oooMmapMarkers != nil { - markers := make([]record.RefMmapMarker, 0, len(oooMmapMarkers)) - for ref, mmapRef := range oooMmapMarkers { - markers = append(markers, record.RefMmapMarker{ - Ref: ref, - MmapRef: mmapRef, - }) + markers := make([]record.RefMmapMarker, 0, oooMmapMarkersCount) + for ref, mmapRefs := range oooMmapMarkers { + for _, mmapRef := range mmapRefs { + markers = append(markers, record.RefMmapMarker{ + Ref: ref, + MmapRef: mmapRef, + }) + } } r := enc.MmapMarkers(markers, a.head.getBytesBuffer()) oooRecords = append(oooRecords, r) @@ -928,11 +932,11 @@ func (a *headAppender) Commit() (err error) { case oooSample: // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. - var mmapRef chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax) + var mmapRefs []chunks.ChunkDiskMapperRef + ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax) if chunkCreated { r, ok := oooMmapMarkers[series.ref] - if !ok || r != 0 { + if !ok || r != nil { // !ok means there are no markers collected for these samples yet. So we first flush the samples // before setting this m-map marker. @@ -943,9 +947,16 @@ func (a *headAppender) Commit() (err error) { } if oooMmapMarkers == nil { - oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef) + oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) + } + if len(mmapRefs) > 0 { + oooMmapMarkers[series.ref] = mmapRefs + oooMmapMarkersCount += len(mmapRefs) + } else { + // No chunk was written to disk, so we need to set an initial marker for this series. + oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} + oooMmapMarkersCount++ } - oooMmapMarkers[series.ref] = mmapRef } if ok { wblSamples = append(wblSamples, s) @@ -1069,14 +1080,14 @@ func (a *headAppender) Commit() (err error) { } // insert is like append, except it inserts. Used for OOO samples. -func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) { +func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { if s.ooo == nil { s.ooo = &memSeriesOOOFields{} } c := s.ooo.oooHeadChunk if c == nil || c.chunk.NumSamples() == int(oooCapMax) { // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. - c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper) + c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper) chunkCreated = true } @@ -1089,7 +1100,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk c.maxTime = t } } - return ok, chunkCreated, mmapRef + return ok, chunkCreated, mmapRefs } // chunkOpts are chunk-level options that are passed when appending to a memSeries. @@ -1431,7 +1442,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange // cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk. // The caller must ensure that s.ooo is not nil. -func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, chunks.ChunkDiskMapperRef) { +func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper) s.ooo.oooHeadChunk = &oooHeadChunk{ @@ -1443,21 +1454,29 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk return s.ooo.oooHeadChunk, ref } -func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) chunks.ChunkDiskMapperRef { +func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) []chunks.ChunkDiskMapperRef { if s.ooo == nil || s.ooo.oooHeadChunk == nil { - // There is no head chunk, so nothing to m-map here. - return 0 + // OOO is not enabled or there is no head chunk, so nothing to m-map here. + return nil + } + chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + if err != nil { + handleChunkWriteError(err) + return nil + } + chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, 1) + for _, memchunk := range chks { + chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, memchunk.chunk, true, handleChunkWriteError) + chunkRefs = append(chunkRefs, chunkRef) + s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ + ref: chunkRef, + numSamples: uint16(memchunk.chunk.NumSamples()), + minTime: memchunk.minTime, + maxTime: memchunk.maxTime, + }) } - xor, _ := s.ooo.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality. - chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, xor, true, handleChunkWriteError) - s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ - ref: chunkRef, - numSamples: uint16(xor.NumSamples()), - minTime: s.ooo.oooHeadChunk.minTime, - maxTime: s.ooo.oooHeadChunk.maxTime, - }) s.ooo.oooHeadChunk = nil - return chunkRef + return chunkRefs } // mmapChunks will m-map all but first chunk on s.headChunks list. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index fa4834516..2456d6a1b 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -4792,9 +4792,11 @@ func TestWBLReplay(t *testing.T) { require.False(t, ok) require.NotNil(t, ms) - xor, err := ms.ooo.oooHeadChunk.chunk.ToXOR() + chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) require.NoError(t, err) + require.Len(t, chks, 1) + xor := chks[0].chunk.(*chunkenc.XORChunk) it := xor.Iterator(nil) actOOOSamples := make([]sample, 0, len(expOOOSamples)) for it.Next() == chunkenc.ValFloat { diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 7f2110fa6..d90163a18 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -15,11 +15,11 @@ package tsdb import ( "fmt" + "github.com/prometheus/prometheus/tsdb/chunkenc" "sort" "github.com/oklog/ulid" - "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/tombstones" ) @@ -74,24 +74,22 @@ func (o *OOOChunk) NumSamples() int { return len(o.samples) } -func (o *OOOChunk) ToXOR() (*chunkenc.XORChunk, error) { - x := chunkenc.NewXORChunk() - app, err := x.Appender() - if err != nil { - return nil, err - } - for _, s := range o.samples { - app.Append(s.t, s.f) - } - return x, nil -} - -func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk, error) { - x := chunkenc.NewXORChunk() - app, err := x.Appender() - if err != nil { - return nil, err +// ToEncodedChunks returns chunks with the samples in the OOOChunk. +// +//nolint:revive // unexported-return. +func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error) { + if len(o.samples) == 0 { + return nil, nil } + // The most common case is that there will be a single chunk, with the same type of samples in it - this is always true for float samples. + chks = make([]memChunk, 0, 1) + var ( + cmint int64 + cmaxt int64 + chunk chunkenc.Chunk + app chunkenc.Appender + ) + prevEncoding := chunkenc.EncNone // Yes we could call the chunk for this, but this is more efficient. for _, s := range o.samples { if s.t < mint { continue @@ -99,9 +97,77 @@ func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk, if s.t > maxt { break } - app.Append(s.t, s.f) + encoding := chunkenc.EncXOR + if s.h != nil { + encoding = chunkenc.EncHistogram + } else if s.fh != nil { + encoding = chunkenc.EncFloatHistogram + } + + // prevApp is the appender for the previous sample. + prevApp := app + + if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram + if prevEncoding != chunkenc.EncNone { + chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + } + cmint = s.t + switch encoding { + case chunkenc.EncXOR: + chunk = chunkenc.NewXORChunk() + case chunkenc.EncHistogram: + chunk = chunkenc.NewHistogramChunk() + case chunkenc.EncFloatHistogram: + chunk = chunkenc.NewFloatHistogramChunk() + default: + chunk = chunkenc.NewXORChunk() + } + app, err = chunk.Appender() + if err != nil { + return + } + } + switch encoding { + case chunkenc.EncXOR: + app.Append(s.t, s.f) + case chunkenc.EncHistogram: + // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. + prevHApp, _ := prevApp.(*chunkenc.HistogramAppender) + var ( + newChunk chunkenc.Chunk + recoded bool + ) + newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.t, s.h, false) + if newChunk != nil { // A new chunk was allocated. + if !recoded { + chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + } + chunk = newChunk + cmint = s.t + } + case chunkenc.EncFloatHistogram: + // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. + prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender) + var ( + newChunk chunkenc.Chunk + recoded bool + ) + newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.t, s.fh, false) + if newChunk != nil { // A new chunk was allocated. + if !recoded { + chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + } + chunk = newChunk + cmint = s.t + } + } + cmaxt = s.t + prevEncoding = encoding } - return x, nil + if prevEncoding != chunkenc.EncNone { + chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + } + return chks, nil } var _ BlockReader = &OOORangeHead{} diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 47972c3cc..aedda49dd 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -108,11 +108,19 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra c := s.ooo.oooHeadChunk if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) - var xor chunkenc.Chunk if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. - xor, _ = c.chunk.ToXOR() // Ignoring error because it can't fail. + chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime) + if err != nil { + handleChunkWriteError(err) + return nil + } + for _, chk := range chks { + addChunk(c.minTime, c.maxTime, ref, chk.chunk) + } + } else { + var enc chunkenc.Chunk + addChunk(c.minTime, c.maxTime, ref, enc) } - addChunk(c.minTime, c.maxTime, ref, xor) } } for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- { @@ -341,14 +349,15 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, continue } - mmapRef := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) - if mmapRef == 0 && len(ms.ooo.oooMmappedChunks) > 0 { + mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) + if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 { // Nothing was m-mapped. So take the mmapRef from the existing slice if it exists. - mmapRef = ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref + mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref} } - seq, off := mmapRef.Unpack() + lastMmapRef := mmapRefs[len(mmapRefs)-1] + seq, off := lastMmapRef.Unpack() if seq > lastSeq || (seq == lastSeq && off > lastOff) { - ch.lastMmapRef, lastSeq, lastOff = mmapRef, seq, off + ch.lastMmapRef, lastSeq, lastOff = lastMmapRef, seq, off } if len(ms.ooo.oooMmappedChunks) > 0 { ch.postings = append(ch.postings, seriesRef)