mirror of
https://github.com/prometheus/prometheus
synced 2025-01-10 00:19:34 +00:00
7a8bb8222c
A lot of this code was hacked together, literally during a hackathon. This commit intends not to change the code substantially, but just make the code obey the usual style practices. A (possibly incomplete) list of areas: * Generally address linter warnings. * The `pgk` directory is deprecated as per dev-summit. No new packages should be added to it. I moved the new `pkg/histogram` package to `model` anticipating what's proposed in #9478. * Make the naming of the Sparse Histogram more consistent. Including abbreviations, there were just too many names for it: SparseHistogram, Histogram, Histo, hist, his, shs, h. The idea is to call it "Histogram" in general. Only add "Sparse" if it is needed to avoid confusion with conventional Histograms (which is rare because the TSDB really has no notion of conventional Histograms). Use abbreviations only in local scope, and then really abbreviate (not just removing three out of seven letters like in "Histo"). This is in the spirit of https://github.com/golang/go/wiki/CodeReviewComments#variable-names * Several other minor name changes. * A lot of formatting of doc comments. For one, following https://github.com/golang/go/wiki/CodeReviewComments#comment-sentences , but also layout question, anticipating how things will look like when rendered by `godoc` (even where `godoc` doesn't render them right now because they are for unexported types or not a doc comment at all but just a normal code comment - consistency is queen!). * Re-enabled `TestQueryLog` and `TestEndopints` (they pass now, leaving them disabled was presumably an oversight). * Bucket iterator for histogram.Histogram is now created with a method. * HistogramChunk.iterator now allows iterator recycling. (I think @dieterbe only commented it out because he was confused by the question in the comment.) * HistogramAppender.Append panics now because we decided to treat staleness marker differently. Signed-off-by: beorn7 <beorn@grafana.com>
956 lines
24 KiB
Go
956 lines
24 KiB
Go
// Copyright 2017 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tsdb
|
|
|
|
import (
|
|
"math"
|
|
"sort"
|
|
"strings"
|
|
"unicode/utf8"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
"github.com/prometheus/prometheus/tsdb/index"
|
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
|
)
|
|
|
|
// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped.
|
|
var regexMetaCharacterBytes [16]byte
|
|
|
|
// isRegexMetaCharacter reports whether byte b needs to be escaped.
|
|
func isRegexMetaCharacter(b byte) bool {
|
|
return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0
|
|
}
|
|
|
|
func init() {
|
|
for _, b := range []byte(`.+*?()|[]{}^$`) {
|
|
regexMetaCharacterBytes[b%16] |= 1 << (b / 16)
|
|
}
|
|
}
|
|
|
|
type blockBaseQuerier struct {
|
|
index IndexReader
|
|
chunks ChunkReader
|
|
tombstones tombstones.Reader
|
|
|
|
closed bool
|
|
|
|
mint, maxt int64
|
|
}
|
|
|
|
func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, error) {
|
|
indexr, err := b.Index()
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "open index reader")
|
|
}
|
|
chunkr, err := b.Chunks()
|
|
if err != nil {
|
|
indexr.Close()
|
|
return nil, errors.Wrap(err, "open chunk reader")
|
|
}
|
|
tombsr, err := b.Tombstones()
|
|
if err != nil {
|
|
indexr.Close()
|
|
chunkr.Close()
|
|
return nil, errors.Wrap(err, "open tombstone reader")
|
|
}
|
|
|
|
if tombsr == nil {
|
|
tombsr = tombstones.NewMemTombstones()
|
|
}
|
|
return &blockBaseQuerier{
|
|
mint: mint,
|
|
maxt: maxt,
|
|
index: indexr,
|
|
chunks: chunkr,
|
|
tombstones: tombsr,
|
|
}, nil
|
|
}
|
|
|
|
func (q *blockBaseQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
|
res, err := q.index.SortedLabelValues(name, matchers...)
|
|
return res, nil, err
|
|
}
|
|
|
|
func (q *blockBaseQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
|
res, err := q.index.LabelNames(matchers...)
|
|
return res, nil, err
|
|
}
|
|
|
|
func (q *blockBaseQuerier) Close() error {
|
|
if q.closed {
|
|
return errors.New("block querier already closed")
|
|
}
|
|
|
|
errs := tsdb_errors.NewMulti(
|
|
q.index.Close(),
|
|
q.chunks.Close(),
|
|
q.tombstones.Close(),
|
|
)
|
|
q.closed = true
|
|
return errs.Err()
|
|
}
|
|
|
|
type blockQuerier struct {
|
|
*blockBaseQuerier
|
|
}
|
|
|
|
// NewBlockQuerier returns a querier against the block reader and requested min and max time range.
|
|
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
|
q, err := newBlockBaseQuerier(b, mint, maxt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &blockQuerier{blockBaseQuerier: q}, nil
|
|
}
|
|
|
|
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
|
mint := q.mint
|
|
maxt := q.maxt
|
|
p, err := PostingsForMatchers(q.index, ms...)
|
|
if err != nil {
|
|
return storage.ErrSeriesSet(err)
|
|
}
|
|
if sortSeries {
|
|
p = q.index.SortedPostings(p)
|
|
}
|
|
|
|
if hints != nil {
|
|
mint = hints.Start
|
|
maxt = hints.End
|
|
if hints.Func == "series" {
|
|
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
|
|
return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt)
|
|
}
|
|
}
|
|
|
|
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt)
|
|
}
|
|
|
|
// blockChunkQuerier provides chunk querying access to a single block database.
|
|
type blockChunkQuerier struct {
|
|
*blockBaseQuerier
|
|
}
|
|
|
|
// NewBlockChunkQuerier returns a chunk querier against the block reader and requested min and max time range.
|
|
func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
|
|
q, err := newBlockBaseQuerier(b, mint, maxt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &blockChunkQuerier{blockBaseQuerier: q}, nil
|
|
}
|
|
|
|
func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
|
mint := q.mint
|
|
maxt := q.maxt
|
|
if hints != nil {
|
|
mint = hints.Start
|
|
maxt = hints.End
|
|
}
|
|
p, err := PostingsForMatchers(q.index, ms...)
|
|
if err != nil {
|
|
return storage.ErrChunkSeriesSet(err)
|
|
}
|
|
if sortSeries {
|
|
p = q.index.SortedPostings(p)
|
|
}
|
|
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt)
|
|
}
|
|
|
|
func findSetMatches(pattern string) []string {
|
|
// Return empty matches if the wrapper from Prometheus is missing.
|
|
if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" {
|
|
return nil
|
|
}
|
|
escaped := false
|
|
sets := []*strings.Builder{{}}
|
|
for i := 4; i < len(pattern)-2; i++ {
|
|
if escaped {
|
|
switch {
|
|
case isRegexMetaCharacter(pattern[i]):
|
|
sets[len(sets)-1].WriteByte(pattern[i])
|
|
case pattern[i] == '\\':
|
|
sets[len(sets)-1].WriteByte('\\')
|
|
default:
|
|
return nil
|
|
}
|
|
escaped = false
|
|
} else {
|
|
switch {
|
|
case isRegexMetaCharacter(pattern[i]):
|
|
if pattern[i] == '|' {
|
|
sets = append(sets, &strings.Builder{})
|
|
} else {
|
|
return nil
|
|
}
|
|
case pattern[i] == '\\':
|
|
escaped = true
|
|
default:
|
|
sets[len(sets)-1].WriteByte(pattern[i])
|
|
}
|
|
}
|
|
}
|
|
matches := make([]string, 0, len(sets))
|
|
for _, s := range sets {
|
|
if s.Len() > 0 {
|
|
matches = append(matches, s.String())
|
|
}
|
|
}
|
|
return matches
|
|
}
|
|
|
|
// PostingsForMatchers assembles a single postings iterator against the index reader
|
|
// based on the given matchers. The resulting postings are not ordered by series.
|
|
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
|
|
var its, notIts []index.Postings
|
|
// See which label must be non-empty.
|
|
// Optimization for case like {l=~".", l!="1"}.
|
|
labelMustBeSet := make(map[string]bool, len(ms))
|
|
for _, m := range ms {
|
|
if !m.Matches("") {
|
|
labelMustBeSet[m.Name] = true
|
|
}
|
|
}
|
|
|
|
for _, m := range ms {
|
|
if labelMustBeSet[m.Name] {
|
|
// If this matcher must be non-empty, we can be smarter.
|
|
matchesEmpty := m.Matches("")
|
|
isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp
|
|
if isNot && matchesEmpty { // l!="foo"
|
|
// If the label can't be empty and is a Not and the inner matcher
|
|
// doesn't match empty, then subtract it out at the end.
|
|
inverse, err := m.Inverse()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
it, err := postingsForMatcher(ix, inverse)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
notIts = append(notIts, it)
|
|
} else if isNot && !matchesEmpty { // l!=""
|
|
// If the label can't be empty and is a Not, but the inner matcher can
|
|
// be empty we need to use inversePostingsForMatcher.
|
|
inverse, err := m.Inverse()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
it, err := inversePostingsForMatcher(ix, inverse)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
its = append(its, it)
|
|
} else { // l="a"
|
|
// Non-Not matcher, use normal postingsForMatcher.
|
|
it, err := postingsForMatcher(ix, m)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
its = append(its, it)
|
|
}
|
|
} else { // l=""
|
|
// If the matchers for a labelname selects an empty value, it selects all
|
|
// the series which don't have the label name set too. See:
|
|
// https://github.com/prometheus/prometheus/issues/3575 and
|
|
// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
|
|
it, err := inversePostingsForMatcher(ix, m)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
notIts = append(notIts, it)
|
|
}
|
|
}
|
|
|
|
// If there's nothing to subtract from, add in everything and remove the notIts later.
|
|
if len(its) == 0 && len(notIts) != 0 {
|
|
k, v := index.AllPostingsKey()
|
|
allPostings, err := ix.Postings(k, v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
its = append(its, allPostings)
|
|
}
|
|
|
|
it := index.Intersect(its...)
|
|
|
|
for _, n := range notIts {
|
|
it = index.Without(it, n)
|
|
}
|
|
|
|
return it, nil
|
|
}
|
|
|
|
func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
|
// This method will not return postings for missing labels.
|
|
|
|
// Fast-path for equal matching.
|
|
if m.Type == labels.MatchEqual {
|
|
return ix.Postings(m.Name, m.Value)
|
|
}
|
|
|
|
// Fast-path for set matching.
|
|
if m.Type == labels.MatchRegexp {
|
|
setMatches := findSetMatches(m.GetRegexString())
|
|
if len(setMatches) > 0 {
|
|
sort.Strings(setMatches)
|
|
return ix.Postings(m.Name, setMatches...)
|
|
}
|
|
}
|
|
|
|
vals, err := ix.LabelValues(m.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var res []string
|
|
lastVal, isSorted := "", true
|
|
for _, val := range vals {
|
|
if m.Matches(val) {
|
|
res = append(res, val)
|
|
if isSorted && val < lastVal {
|
|
isSorted = false
|
|
}
|
|
lastVal = val
|
|
}
|
|
}
|
|
|
|
if len(res) == 0 {
|
|
return index.EmptyPostings(), nil
|
|
}
|
|
|
|
if !isSorted {
|
|
sort.Strings(res)
|
|
}
|
|
return ix.Postings(m.Name, res...)
|
|
}
|
|
|
|
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
|
func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
|
vals, err := ix.LabelValues(m.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var res []string
|
|
lastVal, isSorted := "", true
|
|
for _, val := range vals {
|
|
if !m.Matches(val) {
|
|
res = append(res, val)
|
|
if isSorted && val < lastVal {
|
|
isSorted = false
|
|
}
|
|
lastVal = val
|
|
}
|
|
}
|
|
|
|
if !isSorted {
|
|
sort.Strings(res)
|
|
}
|
|
return ix.Postings(m.Name, res...)
|
|
}
|
|
|
|
func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
|
|
// We're only interested in metrics which have the label <name>.
|
|
requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, name, "")
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "Failed to instantiate label matcher")
|
|
}
|
|
|
|
var p index.Postings
|
|
p, err = PostingsForMatchers(r, append(matchers, requireLabel)...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dedupe := map[string]interface{}{}
|
|
for p.Next() {
|
|
v, err := r.LabelValueFor(p.At(), name)
|
|
if err != nil {
|
|
if err == storage.ErrNotFound {
|
|
continue
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
dedupe[v] = nil
|
|
}
|
|
|
|
if err = p.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
values := make([]string, 0, len(dedupe))
|
|
for value := range dedupe {
|
|
values = append(values, value)
|
|
}
|
|
|
|
return values, nil
|
|
}
|
|
|
|
func labelNamesWithMatchers(r IndexReader, matchers ...*labels.Matcher) ([]string, error) {
|
|
p, err := PostingsForMatchers(r, matchers...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var postings []uint64
|
|
for p.Next() {
|
|
postings = append(postings, p.At())
|
|
}
|
|
if p.Err() != nil {
|
|
return nil, errors.Wrapf(p.Err(), "postings for label names with matchers")
|
|
}
|
|
|
|
return r.LabelNamesFor(postings...)
|
|
}
|
|
|
|
// blockBaseSeriesSet allows to iterate over all series in the single block.
|
|
// Iterated series are trimmed with given min and max time as well as tombstones.
|
|
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
|
|
type blockBaseSeriesSet struct {
|
|
p index.Postings
|
|
index IndexReader
|
|
chunks ChunkReader
|
|
tombstones tombstones.Reader
|
|
mint, maxt int64
|
|
|
|
currIterFn func() *populateWithDelGenericSeriesIterator
|
|
currLabels labels.Labels
|
|
|
|
bufChks []chunks.Meta
|
|
bufLbls labels.Labels
|
|
err error
|
|
}
|
|
|
|
func (b *blockBaseSeriesSet) Next() bool {
|
|
for b.p.Next() {
|
|
if err := b.index.Series(b.p.At(), &b.bufLbls, &b.bufChks); err != nil {
|
|
// Postings may be stale. Skip if no underlying series exists.
|
|
if errors.Cause(err) == storage.ErrNotFound {
|
|
continue
|
|
}
|
|
b.err = errors.Wrapf(err, "get series %d", b.p.At())
|
|
return false
|
|
}
|
|
|
|
if len(b.bufChks) == 0 {
|
|
continue
|
|
}
|
|
|
|
intervals, err := b.tombstones.Get(b.p.At())
|
|
if err != nil {
|
|
b.err = errors.Wrap(err, "get tombstones")
|
|
return false
|
|
}
|
|
|
|
// NOTE:
|
|
// * block time range is half-open: [meta.MinTime, meta.MaxTime).
|
|
// * chunks are both closed: [chk.MinTime, chk.MaxTime].
|
|
// * requested time ranges are closed: [req.Start, req.End].
|
|
|
|
var trimFront, trimBack bool
|
|
|
|
// Copy chunks as iterables are reusable.
|
|
chks := make([]chunks.Meta, 0, len(b.bufChks))
|
|
|
|
// Prefilter chunks and pick those which are not entirely deleted or totally outside of the requested range.
|
|
for _, chk := range b.bufChks {
|
|
if chk.MaxTime < b.mint {
|
|
continue
|
|
}
|
|
if chk.MinTime > b.maxt {
|
|
continue
|
|
}
|
|
|
|
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(intervals)) {
|
|
chks = append(chks, chk)
|
|
}
|
|
|
|
// If still not entirely deleted, check if trim is needed based on requested time range.
|
|
if chk.MinTime < b.mint {
|
|
trimFront = true
|
|
}
|
|
if chk.MaxTime > b.maxt {
|
|
trimBack = true
|
|
}
|
|
}
|
|
|
|
if len(chks) == 0 {
|
|
continue
|
|
}
|
|
|
|
if trimFront {
|
|
intervals = intervals.Add(tombstones.Interval{Mint: math.MinInt64, Maxt: b.mint - 1})
|
|
}
|
|
if trimBack {
|
|
intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64})
|
|
}
|
|
|
|
b.currLabels = make(labels.Labels, len(b.bufLbls))
|
|
copy(b.currLabels, b.bufLbls)
|
|
|
|
b.currIterFn = func() *populateWithDelGenericSeriesIterator {
|
|
return newPopulateWithDelGenericSeriesIterator(b.chunks, chks, intervals)
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (b *blockBaseSeriesSet) Err() error {
|
|
if b.err != nil {
|
|
return b.err
|
|
}
|
|
return b.p.Err()
|
|
}
|
|
|
|
func (b *blockBaseSeriesSet) Warnings() storage.Warnings { return nil }
|
|
|
|
// populateWithDelGenericSeriesIterator allows to iterate over given chunk metas. In each iteration it ensures
|
|
// that chunks are trimmed based on given tombstones interval if any.
|
|
//
|
|
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully removed by intervals are filtered out in previous phase.
|
|
//
|
|
// On each iteration currChkMeta is available. If currDelIter is not nil, it means that chunk iterator in currChkMeta
|
|
// is invalid and chunk rewrite is needed, currDelIter should be used.
|
|
type populateWithDelGenericSeriesIterator struct {
|
|
chunks ChunkReader
|
|
// chks are expected to be sorted by minTime and should be related to the same, single series.
|
|
chks []chunks.Meta
|
|
|
|
i int
|
|
err error
|
|
bufIter *DeletedIterator
|
|
intervals tombstones.Intervals
|
|
|
|
currDelIter chunkenc.Iterator
|
|
currChkMeta chunks.Meta
|
|
}
|
|
|
|
func newPopulateWithDelGenericSeriesIterator(
|
|
chunks ChunkReader,
|
|
chks []chunks.Meta,
|
|
intervals tombstones.Intervals,
|
|
) *populateWithDelGenericSeriesIterator {
|
|
return &populateWithDelGenericSeriesIterator{
|
|
chunks: chunks,
|
|
chks: chks,
|
|
i: -1,
|
|
bufIter: &DeletedIterator{},
|
|
intervals: intervals,
|
|
}
|
|
}
|
|
|
|
func (p *populateWithDelGenericSeriesIterator) next() bool {
|
|
if p.err != nil || p.i >= len(p.chks)-1 {
|
|
return false
|
|
}
|
|
|
|
p.i++
|
|
p.currChkMeta = p.chks[p.i]
|
|
|
|
p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta.Ref)
|
|
if p.err != nil {
|
|
p.err = errors.Wrapf(p.err, "cannot populate chunk %d", p.currChkMeta.Ref)
|
|
return false
|
|
}
|
|
|
|
p.bufIter.Intervals = p.bufIter.Intervals[:0]
|
|
for _, interval := range p.intervals {
|
|
if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
|
|
p.bufIter.Intervals = p.bufIter.Intervals.Add(interval)
|
|
}
|
|
}
|
|
|
|
// Re-encode head chunks that are still open (being appended to) or
|
|
// outside the compacted MaxTime range.
|
|
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
|
|
// This happens when snapshotting the head block or just fetching chunks from TSDB.
|
|
//
|
|
// TODO think how to avoid the typecasting to verify when it is head block.
|
|
_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
|
|
if len(p.bufIter.Intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
|
|
// If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is.
|
|
p.currDelIter = nil
|
|
return true
|
|
}
|
|
|
|
// We don't want full chunk or it's potentially still opened, take just part of it.
|
|
p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil)
|
|
p.currDelIter = p.bufIter
|
|
return true
|
|
}
|
|
|
|
func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err }
|
|
|
|
func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator {
|
|
return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p}
|
|
}
|
|
func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator {
|
|
return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p}
|
|
}
|
|
|
|
// populateWithDelSeriesIterator allows to iterate over samples for the single series.
|
|
type populateWithDelSeriesIterator struct {
|
|
*populateWithDelGenericSeriesIterator
|
|
|
|
curr chunkenc.Iterator
|
|
}
|
|
|
|
func (p *populateWithDelSeriesIterator) Next() bool {
|
|
if p.curr != nil && p.curr.Next() {
|
|
return true
|
|
}
|
|
|
|
for p.next() {
|
|
if p.currDelIter != nil {
|
|
p.curr = p.currDelIter
|
|
} else {
|
|
p.curr = p.currChkMeta.Chunk.Iterator(nil)
|
|
}
|
|
if p.curr.Next() {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (p *populateWithDelSeriesIterator) Seek(t int64) bool {
|
|
if p.curr != nil && p.curr.Seek(t) {
|
|
return true
|
|
}
|
|
for p.Next() {
|
|
if p.curr.Seek(t) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() }
|
|
func (p *populateWithDelSeriesIterator) AtHistogram() (int64, histogram.Histogram) {
|
|
return p.curr.AtHistogram()
|
|
}
|
|
func (p *populateWithDelSeriesIterator) ChunkEncoding() chunkenc.Encoding {
|
|
return p.curr.ChunkEncoding()
|
|
}
|
|
|
|
func (p *populateWithDelSeriesIterator) Err() error {
|
|
if err := p.populateWithDelGenericSeriesIterator.Err(); err != nil {
|
|
return err
|
|
}
|
|
if p.curr != nil {
|
|
return p.curr.Err()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type populateWithDelChunkSeriesIterator struct {
|
|
*populateWithDelGenericSeriesIterator
|
|
|
|
curr chunks.Meta
|
|
}
|
|
|
|
func (p *populateWithDelChunkSeriesIterator) Next() bool {
|
|
if !p.next() {
|
|
return false
|
|
}
|
|
|
|
p.curr = p.currChkMeta
|
|
if p.currDelIter == nil {
|
|
return true
|
|
}
|
|
|
|
// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
|
|
var (
|
|
newChunk chunkenc.Chunk
|
|
app chunkenc.Appender
|
|
err error
|
|
)
|
|
if p.currDelIter.ChunkEncoding() == chunkenc.EncHistogram {
|
|
newChunk = chunkenc.NewHistogramChunk()
|
|
app, err = newChunk.Appender()
|
|
} else {
|
|
newChunk = chunkenc.NewXORChunk()
|
|
app, err = newChunk.Appender()
|
|
}
|
|
if err != nil {
|
|
p.err = err
|
|
return false
|
|
}
|
|
|
|
if !p.currDelIter.Next() {
|
|
if err := p.currDelIter.Err(); err != nil {
|
|
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
|
|
return false
|
|
}
|
|
|
|
// Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator.
|
|
p.err = errors.Wrap(err, "populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk")
|
|
return false
|
|
}
|
|
|
|
var (
|
|
t int64
|
|
v float64
|
|
h histogram.Histogram
|
|
)
|
|
if p.currDelIter.ChunkEncoding() == chunkenc.EncHistogram {
|
|
if hc, ok := p.currChkMeta.Chunk.(*chunkenc.HistogramChunk); ok {
|
|
newChunk.(*chunkenc.HistogramChunk).SetCounterResetHeader(hc.GetCounterResetHeader())
|
|
}
|
|
t, h = p.currDelIter.AtHistogram()
|
|
p.curr.MinTime = t
|
|
app.AppendHistogram(t, h.Copy())
|
|
for p.currDelIter.Next() {
|
|
t, h = p.currDelIter.AtHistogram()
|
|
app.AppendHistogram(t, h.Copy())
|
|
}
|
|
} else {
|
|
t, v = p.currDelIter.At()
|
|
p.curr.MinTime = t
|
|
app.Append(t, v)
|
|
for p.currDelIter.Next() {
|
|
t, v = p.currDelIter.At()
|
|
app.Append(t, v)
|
|
}
|
|
}
|
|
|
|
if err := p.currDelIter.Err(); err != nil {
|
|
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
|
|
return false
|
|
}
|
|
|
|
p.curr.Chunk = newChunk
|
|
p.curr.MaxTime = t
|
|
return true
|
|
}
|
|
|
|
func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.curr }
|
|
|
|
// blockSeriesSet allows to iterate over sorted, populated series with applied tombstones.
|
|
// Series with all deleted chunks are still present as Series with no samples.
|
|
// Samples from chunks are also trimmed to requested min and max time.
|
|
type blockSeriesSet struct {
|
|
blockBaseSeriesSet
|
|
}
|
|
|
|
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet {
|
|
return &blockSeriesSet{
|
|
blockBaseSeriesSet{
|
|
index: i,
|
|
chunks: c,
|
|
tombstones: t,
|
|
p: p,
|
|
mint: mint,
|
|
maxt: maxt,
|
|
bufLbls: make(labels.Labels, 0, 10),
|
|
},
|
|
}
|
|
}
|
|
|
|
func (b *blockSeriesSet) At() storage.Series {
|
|
// At can be looped over before iterating, so save the current value locally.
|
|
currIterFn := b.currIterFn
|
|
return &storage.SeriesEntry{
|
|
Lset: b.currLabels,
|
|
SampleIteratorFn: func() chunkenc.Iterator {
|
|
return currIterFn().toSeriesIterator()
|
|
},
|
|
}
|
|
}
|
|
|
|
// blockChunkSeriesSet allows to iterate over sorted, populated series with applied tombstones.
|
|
// Series with all deleted chunks are still present as Labelled iterator with no chunks.
|
|
// Chunks are also trimmed to requested [min and max] (keeping samples with min and max timestamps).
|
|
type blockChunkSeriesSet struct {
|
|
blockBaseSeriesSet
|
|
}
|
|
|
|
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet {
|
|
return &blockChunkSeriesSet{
|
|
blockBaseSeriesSet{
|
|
index: i,
|
|
chunks: c,
|
|
tombstones: t,
|
|
p: p,
|
|
mint: mint,
|
|
maxt: maxt,
|
|
bufLbls: make(labels.Labels, 0, 10),
|
|
},
|
|
}
|
|
}
|
|
|
|
func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
|
|
// At can be looped over before iterating, so save the current value locally.
|
|
currIterFn := b.currIterFn
|
|
return &storage.ChunkSeriesEntry{
|
|
Lset: b.currLabels,
|
|
ChunkIteratorFn: func() chunks.Iterator {
|
|
return currIterFn().toChunkSeriesIterator()
|
|
},
|
|
}
|
|
}
|
|
|
|
// NewMergedStringIter returns string iterator that allows to merge symbols on demand and stream result.
|
|
func NewMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter {
|
|
return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()}
|
|
}
|
|
|
|
type mergedStringIter struct {
|
|
a index.StringIter
|
|
b index.StringIter
|
|
aok, bok bool
|
|
cur string
|
|
}
|
|
|
|
func (m *mergedStringIter) Next() bool {
|
|
if (!m.aok && !m.bok) || (m.Err() != nil) {
|
|
return false
|
|
}
|
|
|
|
if !m.aok {
|
|
m.cur = m.b.At()
|
|
m.bok = m.b.Next()
|
|
} else if !m.bok {
|
|
m.cur = m.a.At()
|
|
m.aok = m.a.Next()
|
|
} else if m.b.At() > m.a.At() {
|
|
m.cur = m.a.At()
|
|
m.aok = m.a.Next()
|
|
} else if m.a.At() > m.b.At() {
|
|
m.cur = m.b.At()
|
|
m.bok = m.b.Next()
|
|
} else { // Equal.
|
|
m.cur = m.b.At()
|
|
m.aok = m.a.Next()
|
|
m.bok = m.b.Next()
|
|
}
|
|
|
|
return true
|
|
}
|
|
func (m mergedStringIter) At() string { return m.cur }
|
|
func (m mergedStringIter) Err() error {
|
|
if m.a.Err() != nil {
|
|
return m.a.Err()
|
|
}
|
|
return m.b.Err()
|
|
}
|
|
|
|
// DeletedIterator wraps chunk Iterator and makes sure any deleted metrics are not returned.
|
|
type DeletedIterator struct {
|
|
// Iter is an Iterator to be wrapped.
|
|
Iter chunkenc.Iterator
|
|
// Intervals are the deletion intervals.
|
|
Intervals tombstones.Intervals
|
|
}
|
|
|
|
func (it *DeletedIterator) At() (int64, float64) {
|
|
return it.Iter.At()
|
|
}
|
|
|
|
func (it *DeletedIterator) AtHistogram() (int64, histogram.Histogram) {
|
|
t, h := it.Iter.AtHistogram()
|
|
return t, h
|
|
}
|
|
|
|
func (it *DeletedIterator) ChunkEncoding() chunkenc.Encoding {
|
|
return it.Iter.ChunkEncoding()
|
|
}
|
|
|
|
func (it *DeletedIterator) Seek(t int64) bool {
|
|
if it.Iter.Err() != nil {
|
|
return false
|
|
}
|
|
if ok := it.Iter.Seek(t); !ok {
|
|
return false
|
|
}
|
|
|
|
// Now double check if the entry falls into a deleted interval.
|
|
var ts int64
|
|
if it.ChunkEncoding() == chunkenc.EncHistogram {
|
|
ts, _ = it.AtHistogram()
|
|
} else {
|
|
ts, _ = it.At()
|
|
}
|
|
for _, itv := range it.Intervals {
|
|
if ts < itv.Mint {
|
|
return true
|
|
}
|
|
|
|
if ts > itv.Maxt {
|
|
it.Intervals = it.Intervals[1:]
|
|
continue
|
|
}
|
|
|
|
// We're in the middle of an interval, we can now call Next().
|
|
return it.Next()
|
|
}
|
|
|
|
// The timestamp is greater than all the deleted intervals.
|
|
return true
|
|
}
|
|
|
|
func (it *DeletedIterator) Next() bool {
|
|
Outer:
|
|
for it.Iter.Next() {
|
|
var ts int64
|
|
if it.ChunkEncoding() == chunkenc.EncHistogram {
|
|
ts, _ = it.AtHistogram()
|
|
} else {
|
|
ts, _ = it.At()
|
|
}
|
|
|
|
for _, tr := range it.Intervals {
|
|
if tr.InBounds(ts) {
|
|
continue Outer
|
|
}
|
|
|
|
if ts <= tr.Maxt {
|
|
return true
|
|
|
|
}
|
|
it.Intervals = it.Intervals[1:]
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (it *DeletedIterator) Err() error { return it.Iter.Err() }
|
|
|
|
type nopChunkReader struct {
|
|
emptyChunk chunkenc.Chunk
|
|
}
|
|
|
|
func newNopChunkReader() ChunkReader {
|
|
return nopChunkReader{
|
|
emptyChunk: chunkenc.NewXORChunk(),
|
|
}
|
|
}
|
|
|
|
func (cr nopChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { return cr.emptyChunk, nil }
|
|
|
|
func (cr nopChunkReader) Close() error { return nil }
|