mirror of
https://github.com/prometheus/prometheus
synced 2025-02-05 22:56:13 +00:00
TSDB: Review feedback
Signed-off-by: Bryan Boreham <bjboreham@gmail.com> * Re-enable check in `createHeadWithOOOSamples` which wasn't really broken. * Move code making `Block` into a `Queryable` into test file. * Make `getSeriesChunks` return a slice (renamed `appendSeriesChunks`). * Rename `oooMergedChunks` to `mergedChunks`. * Improve comment on `ChunkOrIterableWithCopy`. * Name return values from unpackHeadChunkRef. Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com> Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
7ffd3ca280
commit
9135da1e4f
@ -467,11 +467,6 @@ func (pb *Block) setCompactionFailed() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Querier implements Queryable.
|
|
||||||
func (pb *Block) Querier(mint, maxt int64) (storage.Querier, error) {
|
|
||||||
return NewBlockQuerier(pb, mint, maxt)
|
|
||||||
}
|
|
||||||
|
|
||||||
type blockIndexReader struct {
|
type blockIndexReader struct {
|
||||||
ir IndexReader
|
ir IndexReader
|
||||||
b *Block
|
b *Block
|
||||||
|
@ -199,19 +199,18 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
|||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
|
||||||
*chks = (*chks)[:0]
|
*chks = (*chks)[:0]
|
||||||
|
*chks = appendSeriesChunks(s, h.mint, h.maxt, *chks)
|
||||||
getSeriesChunks(s, h.mint, h.maxt, chks)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSeriesChunks(s *memSeries, mint, maxt int64, chks *[]chunks.Meta) {
|
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta {
|
||||||
for i, c := range s.mmappedChunks {
|
for i, c := range s.mmappedChunks {
|
||||||
// Do not expose chunks that are outside of the specified range.
|
// Do not expose chunks that are outside of the specified range.
|
||||||
if !c.OverlapsClosedInterval(mint, maxt) {
|
if !c.OverlapsClosedInterval(mint, maxt) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
*chks = append(*chks, chunks.Meta{
|
chks = append(chks, chunks.Meta{
|
||||||
MinTime: c.minTime,
|
MinTime: c.minTime,
|
||||||
MaxTime: c.maxTime,
|
MaxTime: c.maxTime,
|
||||||
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
|
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
|
||||||
@ -230,7 +229,7 @@ func getSeriesChunks(s *memSeries, mint, maxt int64, chks *[]chunks.Meta) {
|
|||||||
maxTime = chk.maxTime
|
maxTime = chk.maxTime
|
||||||
}
|
}
|
||||||
if chk.OverlapsClosedInterval(mint, maxt) {
|
if chk.OverlapsClosedInterval(mint, maxt) {
|
||||||
*chks = append(*chks, chunks.Meta{
|
chks = append(chks, chunks.Meta{
|
||||||
MinTime: chk.minTime,
|
MinTime: chk.minTime,
|
||||||
MaxTime: maxTime,
|
MaxTime: maxTime,
|
||||||
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))),
|
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))),
|
||||||
@ -239,6 +238,7 @@ func getSeriesChunks(s *memSeries, mint, maxt int64, chks *[]chunks.Meta) {
|
|||||||
j++
|
j++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return chks
|
||||||
}
|
}
|
||||||
|
|
||||||
// headChunkID returns the HeadChunkID referred to by the given position.
|
// headChunkID returns the HeadChunkID referred to by the given position.
|
||||||
@ -259,7 +259,7 @@ func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
|
|||||||
return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask
|
return (chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID) | oooChunkIDMask
|
||||||
}
|
}
|
||||||
|
|
||||||
func unpackHeadChunkRef(ref chunks.ChunkRef) (chunks.HeadSeriesRef, chunks.HeadChunkID, bool) {
|
func unpackHeadChunkRef(ref chunks.ChunkRef) (seriesID chunks.HeadSeriesRef, chunkID chunks.HeadChunkID, isOOO bool) {
|
||||||
sid, cid := chunks.HeadChunkRef(ref).Unpack()
|
sid, cid := chunks.HeadChunkRef(ref).Unpack()
|
||||||
return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0
|
return sid, (cid & (oooChunkIDMask - 1)), (cid & oooChunkIDMask) != 0
|
||||||
}
|
}
|
||||||
@ -481,14 +481,14 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
|
|||||||
return elem, true, offset == 0, nil
|
return elem, true, offset == 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// oooMergedChunks return an iterable over one or more OOO chunks for the given
|
// mergedChunks return an iterable over one or more OOO chunks for the given
|
||||||
// chunks.Meta reference from memory or by m-mapping it from the disk. The
|
// chunks.Meta reference from memory or by m-mapping it from the disk. The
|
||||||
// returned iterable will be a merge of all the overlapping chunks, if any,
|
// returned iterable will be a merge of all the overlapping chunks, if any,
|
||||||
// amongst all the chunks in the OOOHead.
|
// amongst all the chunks in the OOOHead.
|
||||||
// If hr is non-nil then in-order chunks are included.
|
// If hr is non-nil then in-order chunks are included.
|
||||||
// This function is not thread safe unless the caller holds a lock.
|
// This function is not thread safe unless the caller holds a lock.
|
||||||
// The caller must ensure that s.ooo is not nil.
|
// The caller must ensure that s.ooo is not nil.
|
||||||
func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (*mergedOOOChunks, error) {
|
func (s *memSeries) mergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, hr *headChunkReader, mint, maxt int64, maxMmapRef chunks.ChunkDiskMapperRef) (chunkenc.Iterable, error) {
|
||||||
_, cid, _ := unpackHeadChunkRef(meta.Ref)
|
_, cid, _ := unpackHeadChunkRef(meta.Ref)
|
||||||
|
|
||||||
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
|
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk meta's are
|
||||||
@ -531,8 +531,7 @@ func (s *memSeries) oooMergedChunks(meta chunks.Meta, cdm *chunks.ChunkDiskMappe
|
|||||||
}
|
}
|
||||||
|
|
||||||
if hr != nil { // Include in-order chunks.
|
if hr != nil { // Include in-order chunks.
|
||||||
var metas []chunks.Meta
|
metas := appendSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), nil)
|
||||||
getSeriesChunks(s, max(meta.MinTime, mint), min(meta.MaxTime, maxt), &metas)
|
|
||||||
for _, m := range metas {
|
for _, m := range metas {
|
||||||
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
|
tmpChks = append(tmpChks, chunkMetaAndChunkDiskMapperRef{
|
||||||
meta: m,
|
meta: m,
|
||||||
|
@ -77,7 +77,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
|
|||||||
if s.ooo != nil {
|
if s.ooo != nil {
|
||||||
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks)
|
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks)
|
||||||
}
|
}
|
||||||
getSeriesChunks(s, oh.mint, oh.maxt, chks)
|
*chks = appendSeriesChunks(s, oh.mint, oh.maxt, *chks)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,7 +127,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||||||
}
|
}
|
||||||
|
|
||||||
if includeInOrder {
|
if includeInOrder {
|
||||||
getSeriesChunks(s, mint, maxt, &tmpChks)
|
tmpChks = appendSeriesChunks(s, mint, maxt, tmpChks)
|
||||||
}
|
}
|
||||||
|
|
||||||
// There is nothing to do if we did not collect any chunk.
|
// There is nothing to do if we did not collect any chunk.
|
||||||
@ -253,13 +253,14 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
|
mc, err := s.mergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
return nil, mc, err
|
return nil, mc, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pass through special behaviour for current head chunk.
|
// ChunkOrIterableWithCopy: implements ChunkReaderWithCopy. The special Copy behaviour
|
||||||
|
// is only implemented for the in-order head chunk.
|
||||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||||
_, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
_, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||||
if !isOOO {
|
if !isOOO {
|
||||||
|
@ -321,10 +321,17 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
|||||||
require.NoError(b, block.Close())
|
require.NoError(b, block.Close())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
benchmarkSelect(b, block, numSeries, false)
|
benchmarkSelect(b, (*queryableBlock)(block), numSeries, false)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Type wrapper to let a Block be a Queryable in benchmarkSelect().
|
||||||
|
type queryableBlock Block
|
||||||
|
|
||||||
|
func (pb *queryableBlock) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||||
|
return NewBlockQuerier((*Block)(pb), mint, maxt)
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkQuerierSelectWithOutOfOrder(b *testing.B) {
|
func BenchmarkQuerierSelectWithOutOfOrder(b *testing.B) {
|
||||||
numSeries := 1000000
|
numSeries := 1000000
|
||||||
_, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) {
|
_, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) {
|
||||||
|
Loading…
Reference in New Issue
Block a user