TSDB: Cosmetic: move HeadAndOOO implementations where old code was
This makes the diffs easier to follow. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
e95607b276
commit
7ffd3ca280
|
@ -30,6 +30,13 @@ import (
|
|||
"github.com/prometheus/prometheus/util/annotations"
|
||||
)
|
||||
|
||||
var _ IndexReader = &HeadAndOOOIndexReader{}
|
||||
|
||||
type HeadAndOOOIndexReader struct {
|
||||
*headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
|
||||
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
|
||||
}
|
||||
|
||||
var _ chunkenc.Iterable = &mergedOOOChunks{}
|
||||
|
||||
// mergedOOOChunks holds the list of iterables for overlapping chunks.
|
||||
|
@ -41,6 +48,39 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator
|
|||
return storage.ChainSampleIteratorFromIterables(iterator, o.chunkIterables)
|
||||
}
|
||||
|
||||
func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
|
||||
hr := &headIndexReader{
|
||||
head: head,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
}
|
||||
return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef}
|
||||
}
|
||||
|
||||
func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||
s := oh.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||
|
||||
if s == nil {
|
||||
oh.head.metrics.seriesNotFound.Inc()
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
builder.Assign(s.labels())
|
||||
|
||||
if chks == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
*chks = (*chks)[:0]
|
||||
|
||||
if s.ooo != nil {
|
||||
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks)
|
||||
}
|
||||
getSeriesChunks(s, oh.mint, oh.maxt, chks)
|
||||
return nil
|
||||
}
|
||||
|
||||
// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so
|
||||
// any chunk at or before this ref will not be considered. 0 disables this check.
|
||||
//
|
||||
|
@ -126,6 +166,20 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||
return nil
|
||||
}
|
||||
|
||||
// LabelValues needs to be overridden from the headIndexReader implementation
|
||||
// so we can return labels within either in-order range or ooo range.
|
||||
func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
if oh.maxt < oh.head.MinTime() && oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxTime() && oh.mint > oh.head.MaxOOOTime() {
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
if len(matchers) == 0 {
|
||||
return oh.head.postings.LabelValues(ctx, name), nil
|
||||
}
|
||||
|
||||
return labelValuesWithMatchers(ctx, oh, name, matchers...)
|
||||
}
|
||||
|
||||
type chunkMetaAndChunkDiskMapperRef struct {
|
||||
meta chunks.Meta
|
||||
ref chunks.ChunkDiskMapperRef
|
||||
|
@ -167,6 +221,64 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) int {
|
|||
}
|
||||
}
|
||||
|
||||
type HeadAndOOOChunkReader struct {
|
||||
head *Head
|
||||
mint, maxt int64
|
||||
cr *headChunkReader // If nil, only read OOO chunks.
|
||||
maxMmapRef chunks.ChunkDiskMapperRef
|
||||
oooIsoState *oooIsolationState
|
||||
}
|
||||
|
||||
func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader {
|
||||
return &HeadAndOOOChunkReader{
|
||||
head: head,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
cr: cr,
|
||||
maxMmapRef: maxMmapRef,
|
||||
oooIsoState: oooIsoState,
|
||||
}
|
||||
}
|
||||
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
||||
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
if !isOOO {
|
||||
return cr.cr.ChunkOrIterable(meta)
|
||||
}
|
||||
|
||||
s := cr.head.series.getByID(sid)
|
||||
// This means that the series has been garbage collected.
|
||||
if s == nil {
|
||||
return nil, nil, storage.ErrNotFound
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
|
||||
s.Unlock()
|
||||
|
||||
return nil, mc, err
|
||||
}
|
||||
|
||||
// Pass through special behaviour for current head chunk.
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||
_, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
if !isOOO {
|
||||
return cr.cr.ChunkOrIterableWithCopy(meta)
|
||||
}
|
||||
chk, iter, err := cr.ChunkOrIterable(meta)
|
||||
return chk, iter, 0, err
|
||||
}
|
||||
|
||||
func (cr *HeadAndOOOChunkReader) Close() error {
|
||||
if cr.cr != nil && cr.cr.isoState != nil {
|
||||
cr.cr.isoState.Close()
|
||||
}
|
||||
if cr.oooIsoState != nil {
|
||||
cr.oooIsoState.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type OOOCompactionHead struct {
|
||||
head *Head
|
||||
lastMmapRef chunks.ChunkDiskMapperRef
|
||||
|
@ -473,112 +585,3 @@ func (q *HeadAndOOOChunkQuerier) Close() error {
|
|||
func (q *HeadAndOOOChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
return selectChunkSeriesSet(ctx, sortSeries, hints, matchers, rangeHeadULID, q.index, q.chunkr, q.head.tombstones, q.mint, q.maxt)
|
||||
}
|
||||
|
||||
type HeadAndOOOIndexReader struct {
|
||||
*headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
|
||||
lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef
|
||||
}
|
||||
|
||||
func NewHeadAndOOOIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader {
|
||||
hr := &headIndexReader{
|
||||
head: head,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
}
|
||||
return &HeadAndOOOIndexReader{hr, lastGarbageCollectedMmapRef}
|
||||
}
|
||||
|
||||
func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||
s := oh.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||
if s == nil {
|
||||
oh.head.metrics.seriesNotFound.Inc()
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
builder.Assign(s.labels())
|
||||
|
||||
if chks == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
*chks = (*chks)[:0]
|
||||
|
||||
if s.ooo != nil {
|
||||
return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, chks)
|
||||
}
|
||||
getSeriesChunks(s, oh.mint, oh.maxt, chks)
|
||||
return nil
|
||||
}
|
||||
|
||||
// LabelValues needs to be overridden from the headIndexReader implementation
|
||||
// so we can return labels within either in-order range or ooo range.
|
||||
func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
if oh.maxt < oh.head.MinTime() && oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxTime() && oh.mint > oh.head.MaxOOOTime() {
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
if len(matchers) == 0 {
|
||||
return oh.head.postings.LabelValues(ctx, name), nil
|
||||
}
|
||||
|
||||
return labelValuesWithMatchers(ctx, oh, name, matchers...)
|
||||
}
|
||||
|
||||
type HeadAndOOOChunkReader struct {
|
||||
head *Head
|
||||
mint, maxt int64
|
||||
cr *headChunkReader // If nil, only read OOO chunks.
|
||||
maxMmapRef chunks.ChunkDiskMapperRef
|
||||
oooIsoState *oooIsolationState
|
||||
}
|
||||
|
||||
func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader {
|
||||
return &HeadAndOOOChunkReader{
|
||||
head: head,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
cr: cr,
|
||||
maxMmapRef: maxMmapRef,
|
||||
oooIsoState: oooIsoState,
|
||||
}
|
||||
}
|
||||
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
|
||||
sid, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
if !isOOO {
|
||||
return cr.cr.ChunkOrIterable(meta)
|
||||
}
|
||||
|
||||
s := cr.head.series.getByID(sid)
|
||||
// This means that the series has been garbage collected.
|
||||
if s == nil {
|
||||
return nil, nil, storage.ErrNotFound
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef)
|
||||
s.Unlock()
|
||||
|
||||
return nil, mc, err
|
||||
}
|
||||
|
||||
// Pass through special behaviour for current head chunk.
|
||||
func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error) {
|
||||
_, _, isOOO := unpackHeadChunkRef(meta.Ref)
|
||||
if !isOOO {
|
||||
return cr.cr.ChunkOrIterableWithCopy(meta)
|
||||
}
|
||||
chk, iter, err := cr.ChunkOrIterable(meta)
|
||||
return chk, iter, 0, err
|
||||
}
|
||||
|
||||
func (cr *HeadAndOOOChunkReader) Close() error {
|
||||
if cr.cr != nil && cr.cr.isoState != nil {
|
||||
cr.cr.isoState.Close()
|
||||
}
|
||||
if cr.oooIsoState != nil {
|
||||
cr.oooIsoState.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue