mirror of
https://github.com/prometheus/prometheus
synced 2024-12-25 07:52:28 +00:00
8907ba6235
This fixes #6992, which was introduced by #6777. There was an intermediate component which translated TSDB errors into storage errors, but that component was deleted and this bug went unnoticed, until we were watching at the Prombench results. Without this, scrape will fail instead of dropping samples or using "Add" when the series have been garbage collected. Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
1211 lines
28 KiB
Go
1211 lines
28 KiB
Go
// Copyright 2017 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tsdb
|
|
|
|
import (
|
|
"sort"
|
|
"strings"
|
|
"unicode/utf8"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
"github.com/prometheus/prometheus/tsdb/index"
|
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
|
)
|
|
|
|
// querier aggregates querying results from time blocks within
|
|
// a single partition.
|
|
type querier struct {
|
|
blocks []storage.Querier
|
|
}
|
|
|
|
func (q *querier) LabelValues(n string) ([]string, storage.Warnings, error) {
|
|
return q.lvals(q.blocks, n)
|
|
}
|
|
|
|
// LabelNames returns all the unique label names present querier blocks.
|
|
func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
|
|
labelNamesMap := make(map[string]struct{})
|
|
var ws storage.Warnings
|
|
for _, b := range q.blocks {
|
|
names, w, err := b.LabelNames()
|
|
ws = append(ws, w...)
|
|
if err != nil {
|
|
return nil, ws, errors.Wrap(err, "LabelNames() from Querier")
|
|
}
|
|
for _, name := range names {
|
|
labelNamesMap[name] = struct{}{}
|
|
}
|
|
}
|
|
|
|
labelNames := make([]string, 0, len(labelNamesMap))
|
|
for name := range labelNamesMap {
|
|
labelNames = append(labelNames, name)
|
|
}
|
|
sort.Strings(labelNames)
|
|
|
|
return labelNames, ws, nil
|
|
}
|
|
|
|
func (q *querier) lvals(qs []storage.Querier, n string) ([]string, storage.Warnings, error) {
|
|
if len(qs) == 0 {
|
|
return nil, nil, nil
|
|
}
|
|
if len(qs) == 1 {
|
|
return qs[0].LabelValues(n)
|
|
}
|
|
l := len(qs) / 2
|
|
|
|
var ws storage.Warnings
|
|
s1, w, err := q.lvals(qs[:l], n)
|
|
ws = append(ws, w...)
|
|
if err != nil {
|
|
return nil, ws, err
|
|
}
|
|
s2, ws, err := q.lvals(qs[l:], n)
|
|
ws = append(ws, w...)
|
|
if err != nil {
|
|
return nil, ws, err
|
|
}
|
|
return mergeStrings(s1, s2), ws, nil
|
|
}
|
|
|
|
func (q *querier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
if len(q.blocks) != 1 {
|
|
return q.SelectSorted(p, ms...)
|
|
}
|
|
// Sorting Head series is slow, and unneeded when only the
|
|
// Head is being queried. Sorting blocks is a noop.
|
|
return q.blocks[0].Select(p, ms...)
|
|
}
|
|
|
|
func (q *querier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
if len(q.blocks) == 0 {
|
|
return storage.EmptySeriesSet(), nil, nil
|
|
}
|
|
ss := make([]storage.SeriesSet, len(q.blocks))
|
|
var ws storage.Warnings
|
|
for i, b := range q.blocks {
|
|
s, w, err := b.SelectSorted(p, ms...)
|
|
ws = append(ws, w...)
|
|
if err != nil {
|
|
return nil, ws, err
|
|
}
|
|
ss[i] = s
|
|
}
|
|
|
|
return NewMergedSeriesSet(ss), ws, nil
|
|
}
|
|
|
|
func (q *querier) Close() error {
|
|
var merr tsdb_errors.MultiError
|
|
|
|
for _, bq := range q.blocks {
|
|
merr.Add(bq.Close())
|
|
}
|
|
return merr.Err()
|
|
}
|
|
|
|
// verticalQuerier aggregates querying results from time blocks within
|
|
// a single partition. The block time ranges can be overlapping.
|
|
type verticalQuerier struct {
|
|
querier
|
|
}
|
|
|
|
func (q *verticalQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
return q.sel(p, q.blocks, ms)
|
|
}
|
|
|
|
func (q *verticalQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
return q.sel(p, q.blocks, ms)
|
|
}
|
|
|
|
func (q *verticalQuerier) sel(p *storage.SelectParams, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
if len(qs) == 0 {
|
|
return storage.EmptySeriesSet(), nil, nil
|
|
}
|
|
if len(qs) == 1 {
|
|
return qs[0].SelectSorted(p, ms...)
|
|
}
|
|
l := len(qs) / 2
|
|
|
|
var ws storage.Warnings
|
|
a, w, err := q.sel(p, qs[:l], ms)
|
|
ws = append(ws, w...)
|
|
if err != nil {
|
|
return nil, ws, err
|
|
}
|
|
b, w, err := q.sel(p, qs[l:], ms)
|
|
ws = append(ws, w...)
|
|
if err != nil {
|
|
return nil, ws, err
|
|
}
|
|
return newMergedVerticalSeriesSet(a, b), ws, nil
|
|
}
|
|
|
|
// NewBlockQuerier returns a querier against the reader.
|
|
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
|
indexr, err := b.Index(mint, maxt)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "open index reader")
|
|
}
|
|
chunkr, err := b.Chunks()
|
|
if err != nil {
|
|
indexr.Close()
|
|
return nil, errors.Wrapf(err, "open chunk reader")
|
|
}
|
|
tombsr, err := b.Tombstones()
|
|
if err != nil {
|
|
indexr.Close()
|
|
chunkr.Close()
|
|
return nil, errors.Wrapf(err, "open tombstone reader")
|
|
}
|
|
return &blockQuerier{
|
|
mint: mint,
|
|
maxt: maxt,
|
|
index: indexr,
|
|
chunks: chunkr,
|
|
tombstones: tombsr,
|
|
}, nil
|
|
}
|
|
|
|
// blockQuerier provides querying access to a single block database.
|
|
type blockQuerier struct {
|
|
index IndexReader
|
|
chunks ChunkReader
|
|
tombstones tombstones.Reader
|
|
|
|
closed bool
|
|
|
|
mint, maxt int64
|
|
}
|
|
|
|
func (q *blockQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
base, err := LookupChunkSeries(q.index, q.tombstones, ms...)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
mint := q.mint
|
|
maxt := q.maxt
|
|
if p != nil {
|
|
mint = p.Start
|
|
maxt = p.End
|
|
}
|
|
return &blockSeriesSet{
|
|
set: &populatedChunkSeries{
|
|
set: base,
|
|
chunks: q.chunks,
|
|
mint: mint,
|
|
maxt: maxt,
|
|
},
|
|
|
|
mint: mint,
|
|
maxt: maxt,
|
|
}, nil, nil
|
|
}
|
|
|
|
func (q *blockQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
|
base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
mint := q.mint
|
|
maxt := q.maxt
|
|
if p != nil {
|
|
mint = p.Start
|
|
maxt = p.End
|
|
}
|
|
return &blockSeriesSet{
|
|
set: &populatedChunkSeries{
|
|
set: base,
|
|
chunks: q.chunks,
|
|
mint: mint,
|
|
maxt: maxt,
|
|
},
|
|
|
|
mint: mint,
|
|
maxt: maxt,
|
|
}, nil, nil
|
|
}
|
|
|
|
func (q *blockQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
|
|
res, err := q.index.LabelValues(name)
|
|
return res, nil, err
|
|
}
|
|
|
|
func (q *blockQuerier) LabelNames() ([]string, storage.Warnings, error) {
|
|
res, err := q.index.LabelNames()
|
|
return res, nil, err
|
|
}
|
|
|
|
func (q *blockQuerier) Close() error {
|
|
if q.closed {
|
|
return errors.New("block querier already closed")
|
|
}
|
|
|
|
var merr tsdb_errors.MultiError
|
|
merr.Add(q.index.Close())
|
|
merr.Add(q.chunks.Close())
|
|
merr.Add(q.tombstones.Close())
|
|
q.closed = true
|
|
return merr.Err()
|
|
}
|
|
|
|
// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped.
|
|
var regexMetaCharacterBytes [16]byte
|
|
|
|
// isRegexMetaCharacter reports whether byte b needs to be escaped.
|
|
func isRegexMetaCharacter(b byte) bool {
|
|
return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0
|
|
}
|
|
|
|
func init() {
|
|
for _, b := range []byte(`.+*?()|[]{}^$`) {
|
|
regexMetaCharacterBytes[b%16] |= 1 << (b / 16)
|
|
}
|
|
}
|
|
|
|
func findSetMatches(pattern string) []string {
|
|
// Return empty matches if the wrapper from Prometheus is missing.
|
|
if len(pattern) < 6 || pattern[:4] != "^(?:" || pattern[len(pattern)-2:] != ")$" {
|
|
return nil
|
|
}
|
|
escaped := false
|
|
sets := []*strings.Builder{{}}
|
|
for i := 4; i < len(pattern)-2; i++ {
|
|
if escaped {
|
|
switch {
|
|
case isRegexMetaCharacter(pattern[i]):
|
|
sets[len(sets)-1].WriteByte(pattern[i])
|
|
case pattern[i] == '\\':
|
|
sets[len(sets)-1].WriteByte('\\')
|
|
default:
|
|
return nil
|
|
}
|
|
escaped = false
|
|
} else {
|
|
switch {
|
|
case isRegexMetaCharacter(pattern[i]):
|
|
if pattern[i] == '|' {
|
|
sets = append(sets, &strings.Builder{})
|
|
} else {
|
|
return nil
|
|
}
|
|
case pattern[i] == '\\':
|
|
escaped = true
|
|
default:
|
|
sets[len(sets)-1].WriteByte(pattern[i])
|
|
}
|
|
}
|
|
}
|
|
matches := make([]string, 0, len(sets))
|
|
for _, s := range sets {
|
|
if s.Len() > 0 {
|
|
matches = append(matches, s.String())
|
|
}
|
|
}
|
|
return matches
|
|
}
|
|
|
|
// PostingsForMatchers assembles a single postings iterator against the index reader
|
|
// based on the given matchers. The resulting postings are not ordered by series.
|
|
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
|
|
var its, notIts []index.Postings
|
|
// See which label must be non-empty.
|
|
// Optimization for case like {l=~".", l!="1"}.
|
|
labelMustBeSet := make(map[string]bool, len(ms))
|
|
for _, m := range ms {
|
|
if !m.Matches("") {
|
|
labelMustBeSet[m.Name] = true
|
|
}
|
|
}
|
|
|
|
for _, m := range ms {
|
|
if labelMustBeSet[m.Name] {
|
|
// If this matcher must be non-empty, we can be smarter.
|
|
matchesEmpty := m.Matches("")
|
|
isNot := m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp
|
|
if isNot && matchesEmpty { // l!="foo"
|
|
// If the label can't be empty and is a Not and the inner matcher
|
|
// doesn't match empty, then subtract it out at the end.
|
|
inverse, err := m.Inverse()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
it, err := postingsForMatcher(ix, inverse)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
notIts = append(notIts, it)
|
|
} else if isNot && !matchesEmpty { // l!=""
|
|
// If the label can't be empty and is a Not, but the inner matcher can
|
|
// be empty we need to use inversePostingsForMatcher.
|
|
inverse, err := m.Inverse()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
it, err := inversePostingsForMatcher(ix, inverse)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
its = append(its, it)
|
|
} else { // l="a"
|
|
// Non-Not matcher, use normal postingsForMatcher.
|
|
it, err := postingsForMatcher(ix, m)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
its = append(its, it)
|
|
}
|
|
} else { // l=""
|
|
// If the matchers for a labelname selects an empty value, it selects all
|
|
// the series which don't have the label name set too. See:
|
|
// https://github.com/prometheus/prometheus/issues/3575 and
|
|
// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
|
|
it, err := inversePostingsForMatcher(ix, m)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
notIts = append(notIts, it)
|
|
}
|
|
}
|
|
|
|
// If there's nothing to subtract from, add in everything and remove the notIts later.
|
|
if len(its) == 0 && len(notIts) != 0 {
|
|
k, v := index.AllPostingsKey()
|
|
allPostings, err := ix.Postings(k, v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
its = append(its, allPostings)
|
|
}
|
|
|
|
it := index.Intersect(its...)
|
|
|
|
for _, n := range notIts {
|
|
it = index.Without(it, n)
|
|
}
|
|
|
|
return it, nil
|
|
}
|
|
|
|
func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
|
// This method will not return postings for missing labels.
|
|
|
|
// Fast-path for equal matching.
|
|
if m.Type == labels.MatchEqual {
|
|
return ix.Postings(m.Name, m.Value)
|
|
}
|
|
|
|
// Fast-path for set matching.
|
|
if m.Type == labels.MatchRegexp {
|
|
setMatches := findSetMatches(m.GetRegexString())
|
|
if len(setMatches) > 0 {
|
|
sort.Strings(setMatches)
|
|
return ix.Postings(m.Name, setMatches...)
|
|
}
|
|
}
|
|
|
|
vals, err := ix.LabelValues(m.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var res []string
|
|
for _, val := range vals {
|
|
if m.Matches(val) {
|
|
res = append(res, val)
|
|
}
|
|
}
|
|
|
|
if len(res) == 0 {
|
|
return index.EmptyPostings(), nil
|
|
}
|
|
|
|
return ix.Postings(m.Name, res...)
|
|
}
|
|
|
|
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
|
func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
|
vals, err := ix.LabelValues(m.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var res []string
|
|
for _, val := range vals {
|
|
if !m.Matches(val) {
|
|
res = append(res, val)
|
|
}
|
|
}
|
|
|
|
return ix.Postings(m.Name, res...)
|
|
}
|
|
|
|
func mergeStrings(a, b []string) []string {
|
|
maxl := len(a)
|
|
if len(b) > len(a) {
|
|
maxl = len(b)
|
|
}
|
|
res := make([]string, 0, maxl*10/9)
|
|
|
|
for len(a) > 0 && len(b) > 0 {
|
|
d := strings.Compare(a[0], b[0])
|
|
|
|
if d == 0 {
|
|
res = append(res, a[0])
|
|
a, b = a[1:], b[1:]
|
|
} else if d < 0 {
|
|
res = append(res, a[0])
|
|
a = a[1:]
|
|
} else if d > 0 {
|
|
res = append(res, b[0])
|
|
b = b[1:]
|
|
}
|
|
}
|
|
|
|
// Append all remaining elements.
|
|
res = append(res, a...)
|
|
res = append(res, b...)
|
|
return res
|
|
}
|
|
|
|
// mergedSeriesSet returns a series sets slice as a single series set. The input series sets
|
|
// must be sorted and sequential in time.
|
|
// TODO(bwplotka): Merge this with merge SeriesSet available in storage package.
|
|
type mergedSeriesSet struct {
|
|
all []storage.SeriesSet
|
|
buf []storage.SeriesSet // A buffer for keeping the order of SeriesSet slice during forwarding the SeriesSet.
|
|
ids []int // The indices of chosen SeriesSet for the current run.
|
|
done bool
|
|
err error
|
|
cur storage.Series
|
|
}
|
|
|
|
// TODO(bwplotka): Merge this with merge SeriesSet available in storage package.
|
|
func NewMergedSeriesSet(all []storage.SeriesSet) storage.SeriesSet {
|
|
if len(all) == 1 {
|
|
return all[0]
|
|
}
|
|
s := &mergedSeriesSet{all: all}
|
|
// Initialize first elements of all sets as Next() needs
|
|
// one element look-ahead.
|
|
s.nextAll()
|
|
if len(s.all) == 0 {
|
|
s.done = true
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *mergedSeriesSet) At() storage.Series {
|
|
return s.cur
|
|
}
|
|
|
|
func (s *mergedSeriesSet) Err() error {
|
|
return s.err
|
|
}
|
|
|
|
// nextAll is to call Next() for all SeriesSet.
|
|
// Because the order of the SeriesSet slice will affect the results,
|
|
// we need to use an buffer slice to hold the order.
|
|
func (s *mergedSeriesSet) nextAll() {
|
|
s.buf = s.buf[:0]
|
|
for _, ss := range s.all {
|
|
if ss.Next() {
|
|
s.buf = append(s.buf, ss)
|
|
} else if ss.Err() != nil {
|
|
s.done = true
|
|
s.err = ss.Err()
|
|
break
|
|
}
|
|
}
|
|
s.all, s.buf = s.buf, s.all
|
|
}
|
|
|
|
// nextWithID is to call Next() for the SeriesSet with the indices of s.ids.
|
|
// Because the order of the SeriesSet slice will affect the results,
|
|
// we need to use an buffer slice to hold the order.
|
|
func (s *mergedSeriesSet) nextWithID() {
|
|
if len(s.ids) == 0 {
|
|
return
|
|
}
|
|
|
|
s.buf = s.buf[:0]
|
|
i1 := 0
|
|
i2 := 0
|
|
for i1 < len(s.all) {
|
|
if i2 < len(s.ids) && i1 == s.ids[i2] {
|
|
if !s.all[s.ids[i2]].Next() {
|
|
if s.all[s.ids[i2]].Err() != nil {
|
|
s.done = true
|
|
s.err = s.all[s.ids[i2]].Err()
|
|
break
|
|
}
|
|
i2++
|
|
i1++
|
|
continue
|
|
}
|
|
i2++
|
|
}
|
|
s.buf = append(s.buf, s.all[i1])
|
|
i1++
|
|
}
|
|
s.all, s.buf = s.buf, s.all
|
|
}
|
|
|
|
func (s *mergedSeriesSet) Next() bool {
|
|
if s.done {
|
|
return false
|
|
}
|
|
|
|
s.nextWithID()
|
|
if s.done {
|
|
return false
|
|
}
|
|
s.ids = s.ids[:0]
|
|
if len(s.all) == 0 {
|
|
s.done = true
|
|
return false
|
|
}
|
|
|
|
// Here we are looking for a set of series sets with the lowest labels,
|
|
// and we will cache their indexes in s.ids.
|
|
s.ids = append(s.ids, 0)
|
|
for i := 1; i < len(s.all); i++ {
|
|
cmp := labels.Compare(s.all[s.ids[0]].At().Labels(), s.all[i].At().Labels())
|
|
if cmp > 0 {
|
|
s.ids = s.ids[:1]
|
|
s.ids[0] = i
|
|
} else if cmp == 0 {
|
|
s.ids = append(s.ids, i)
|
|
}
|
|
}
|
|
|
|
if len(s.ids) > 1 {
|
|
series := make([]storage.Series, len(s.ids))
|
|
for i, idx := range s.ids {
|
|
series[i] = s.all[idx].At()
|
|
}
|
|
s.cur = &chainedSeries{series: series}
|
|
} else {
|
|
s.cur = s.all[s.ids[0]].At()
|
|
}
|
|
return true
|
|
}
|
|
|
|
type mergedVerticalSeriesSet struct {
|
|
a, b storage.SeriesSet
|
|
cur storage.Series
|
|
adone, bdone bool
|
|
}
|
|
|
|
// NewMergedVerticalSeriesSet takes two series sets as a single series set.
|
|
// The input series sets must be sorted and
|
|
// the time ranges of the series can be overlapping.
|
|
func NewMergedVerticalSeriesSet(a, b storage.SeriesSet) storage.SeriesSet {
|
|
return newMergedVerticalSeriesSet(a, b)
|
|
}
|
|
|
|
func newMergedVerticalSeriesSet(a, b storage.SeriesSet) *mergedVerticalSeriesSet {
|
|
s := &mergedVerticalSeriesSet{a: a, b: b}
|
|
// Initialize first elements of both sets as Next() needs
|
|
// one element look-ahead.
|
|
s.adone = !s.a.Next()
|
|
s.bdone = !s.b.Next()
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *mergedVerticalSeriesSet) At() storage.Series {
|
|
return s.cur
|
|
}
|
|
|
|
func (s *mergedVerticalSeriesSet) Err() error {
|
|
if s.a.Err() != nil {
|
|
return s.a.Err()
|
|
}
|
|
return s.b.Err()
|
|
}
|
|
|
|
func (s *mergedVerticalSeriesSet) compare() int {
|
|
if s.adone {
|
|
return 1
|
|
}
|
|
if s.bdone {
|
|
return -1
|
|
}
|
|
return labels.Compare(s.a.At().Labels(), s.b.At().Labels())
|
|
}
|
|
|
|
func (s *mergedVerticalSeriesSet) Next() bool {
|
|
if s.adone && s.bdone || s.Err() != nil {
|
|
return false
|
|
}
|
|
|
|
d := s.compare()
|
|
|
|
// Both sets contain the current series. Chain them into a single one.
|
|
if d > 0 {
|
|
s.cur = s.b.At()
|
|
s.bdone = !s.b.Next()
|
|
} else if d < 0 {
|
|
s.cur = s.a.At()
|
|
s.adone = !s.a.Next()
|
|
} else {
|
|
s.cur = &verticalChainedSeries{series: []storage.Series{s.a.At(), s.b.At()}}
|
|
s.adone = !s.a.Next()
|
|
s.bdone = !s.b.Next()
|
|
}
|
|
return true
|
|
}
|
|
|
|
// baseChunkSeries loads the label set and chunk references for a postings
|
|
// list from an index. It filters out series that have labels set that should be unset.
|
|
type baseChunkSeries struct {
|
|
p index.Postings
|
|
index IndexReader
|
|
tombstones tombstones.Reader
|
|
|
|
lset labels.Labels
|
|
chks []chunks.Meta
|
|
intervals tombstones.Intervals
|
|
err error
|
|
}
|
|
|
|
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
|
|
// over them. It drops chunks based on tombstones in the given reader.
|
|
func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
|
|
return lookupChunkSeries(false, ir, tr, ms...)
|
|
}
|
|
|
|
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
|
|
// over them. It drops chunks based on tombstones in the given reader. Series will be in order.
|
|
func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
|
|
return lookupChunkSeries(true, ir, tr, ms...)
|
|
}
|
|
|
|
func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
|
|
if tr == nil {
|
|
tr = tombstones.NewMemTombstones()
|
|
}
|
|
p, err := PostingsForMatchers(ir, ms...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if sorted {
|
|
p = ir.SortedPostings(p)
|
|
}
|
|
return &baseChunkSeries{
|
|
p: p,
|
|
index: ir,
|
|
tombstones: tr,
|
|
}, nil
|
|
}
|
|
|
|
func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
|
|
return s.lset, s.chks, s.intervals
|
|
}
|
|
|
|
func (s *baseChunkSeries) Err() error { return s.err }
|
|
|
|
func (s *baseChunkSeries) Next() bool {
|
|
var (
|
|
lset = make(labels.Labels, len(s.lset))
|
|
chkMetas = make([]chunks.Meta, len(s.chks))
|
|
err error
|
|
)
|
|
|
|
for s.p.Next() {
|
|
ref := s.p.At()
|
|
if err := s.index.Series(ref, &lset, &chkMetas); err != nil {
|
|
// Postings may be stale. Skip if no underlying series exists.
|
|
if errors.Cause(err) == storage.ErrNotFound {
|
|
continue
|
|
}
|
|
s.err = err
|
|
return false
|
|
}
|
|
|
|
s.lset = lset
|
|
s.chks = chkMetas
|
|
s.intervals, err = s.tombstones.Get(s.p.At())
|
|
if err != nil {
|
|
s.err = errors.Wrap(err, "get tombstones")
|
|
return false
|
|
}
|
|
|
|
if len(s.intervals) > 0 {
|
|
// Only those chunks that are not entirely deleted.
|
|
chks := make([]chunks.Meta, 0, len(s.chks))
|
|
for _, chk := range s.chks {
|
|
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(s.intervals)) {
|
|
chks = append(chks, chk)
|
|
}
|
|
}
|
|
|
|
s.chks = chks
|
|
}
|
|
|
|
return true
|
|
}
|
|
if err := s.p.Err(); err != nil {
|
|
s.err = err
|
|
}
|
|
return false
|
|
}
|
|
|
|
// populatedChunkSeries loads chunk data from a store for a set of series
|
|
// with known chunk references. It filters out chunks that do not fit the
|
|
// given time range.
|
|
type populatedChunkSeries struct {
|
|
set storage.ChunkSeriesSet
|
|
chunks ChunkReader
|
|
mint, maxt int64
|
|
|
|
err error
|
|
chks []chunks.Meta
|
|
lset labels.Labels
|
|
intervals tombstones.Intervals
|
|
}
|
|
|
|
func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
|
|
return s.lset, s.chks, s.intervals
|
|
}
|
|
|
|
func (s *populatedChunkSeries) Err() error { return s.err }
|
|
|
|
func (s *populatedChunkSeries) Next() bool {
|
|
for s.set.Next() {
|
|
lset, chks, dranges := s.set.At()
|
|
|
|
for len(chks) > 0 {
|
|
if chks[0].MaxTime >= s.mint {
|
|
break
|
|
}
|
|
chks = chks[1:]
|
|
}
|
|
|
|
// This is to delete in place while iterating.
|
|
for i, rlen := 0, len(chks); i < rlen; i++ {
|
|
j := i - (rlen - len(chks))
|
|
c := &chks[j]
|
|
|
|
// Break out at the first chunk that has no overlap with mint, maxt.
|
|
if c.MinTime > s.maxt {
|
|
chks = chks[:j]
|
|
break
|
|
}
|
|
|
|
c.Chunk, s.err = s.chunks.Chunk(c.Ref)
|
|
if s.err != nil {
|
|
// This means that the chunk has be garbage collected. Remove it from the list.
|
|
if s.err == storage.ErrNotFound {
|
|
s.err = nil
|
|
// Delete in-place.
|
|
s.chks = append(chks[:j], chks[j+1:]...)
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
|
|
if len(chks) == 0 {
|
|
continue
|
|
}
|
|
|
|
s.lset = lset
|
|
s.chks = chks
|
|
s.intervals = dranges
|
|
|
|
return true
|
|
}
|
|
if err := s.set.Err(); err != nil {
|
|
s.err = err
|
|
}
|
|
return false
|
|
}
|
|
|
|
// blockSeriesSet is a set of series from an inverted index query.
|
|
type blockSeriesSet struct {
|
|
set storage.ChunkSeriesSet
|
|
err error
|
|
cur storage.Series
|
|
|
|
mint, maxt int64
|
|
}
|
|
|
|
func (s *blockSeriesSet) Next() bool {
|
|
for s.set.Next() {
|
|
lset, chunks, dranges := s.set.At()
|
|
s.cur = &chunkSeries{
|
|
labels: lset,
|
|
chunks: chunks,
|
|
mint: s.mint,
|
|
maxt: s.maxt,
|
|
|
|
intervals: dranges,
|
|
}
|
|
return true
|
|
}
|
|
if s.set.Err() != nil {
|
|
s.err = s.set.Err()
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *blockSeriesSet) At() storage.Series { return s.cur }
|
|
func (s *blockSeriesSet) Err() error { return s.err }
|
|
|
|
// chunkSeries is a series that is backed by a sequence of chunks holding
|
|
// time series data.
|
|
type chunkSeries struct {
|
|
labels labels.Labels
|
|
chunks []chunks.Meta // in-order chunk refs
|
|
|
|
mint, maxt int64
|
|
|
|
intervals tombstones.Intervals
|
|
}
|
|
|
|
func (s *chunkSeries) Labels() labels.Labels {
|
|
return s.labels
|
|
}
|
|
|
|
func (s *chunkSeries) Iterator() chunkenc.Iterator {
|
|
return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt)
|
|
}
|
|
|
|
// chainedSeries implements a series for a list of time-sorted series.
|
|
// They all must have the same labels.
|
|
type chainedSeries struct {
|
|
series []storage.Series
|
|
}
|
|
|
|
func (s *chainedSeries) Labels() labels.Labels {
|
|
return s.series[0].Labels()
|
|
}
|
|
|
|
func (s *chainedSeries) Iterator() chunkenc.Iterator {
|
|
return newChainedSeriesIterator(s.series...)
|
|
}
|
|
|
|
// chainedSeriesIterator implements a series iterator over a list
|
|
// of time-sorted, non-overlapping iterators.
|
|
type chainedSeriesIterator struct {
|
|
series []storage.Series // series in time order
|
|
|
|
i int
|
|
cur chunkenc.Iterator
|
|
}
|
|
|
|
func newChainedSeriesIterator(s ...storage.Series) *chainedSeriesIterator {
|
|
return &chainedSeriesIterator{
|
|
series: s,
|
|
i: 0,
|
|
cur: s[0].Iterator(),
|
|
}
|
|
}
|
|
|
|
func (it *chainedSeriesIterator) Seek(t int64) bool {
|
|
// We just scan the chained series sequentially as they are already
|
|
// pre-selected by relevant time and should be accessed sequentially anyway.
|
|
for i, s := range it.series[it.i:] {
|
|
cur := s.Iterator()
|
|
if !cur.Seek(t) {
|
|
continue
|
|
}
|
|
it.cur = cur
|
|
it.i += i
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (it *chainedSeriesIterator) Next() bool {
|
|
if it.cur.Next() {
|
|
return true
|
|
}
|
|
if err := it.cur.Err(); err != nil {
|
|
return false
|
|
}
|
|
if it.i == len(it.series)-1 {
|
|
return false
|
|
}
|
|
|
|
it.i++
|
|
it.cur = it.series[it.i].Iterator()
|
|
|
|
return it.Next()
|
|
}
|
|
|
|
func (it *chainedSeriesIterator) At() (t int64, v float64) {
|
|
return it.cur.At()
|
|
}
|
|
|
|
func (it *chainedSeriesIterator) Err() error {
|
|
return it.cur.Err()
|
|
}
|
|
|
|
// verticalChainedSeries implements a series for a list of time-sorted, time-overlapping series.
|
|
// They all must have the same labels.
|
|
type verticalChainedSeries struct {
|
|
series []storage.Series
|
|
}
|
|
|
|
func (s *verticalChainedSeries) Labels() labels.Labels {
|
|
return s.series[0].Labels()
|
|
}
|
|
|
|
func (s *verticalChainedSeries) Iterator() chunkenc.Iterator {
|
|
return newVerticalMergeSeriesIterator(s.series...)
|
|
}
|
|
|
|
// verticalMergeSeriesIterator implements a series iterator over a list
|
|
// of time-sorted, time-overlapping iterators.
|
|
type verticalMergeSeriesIterator struct {
|
|
a, b chunkenc.Iterator
|
|
aok, bok, initialized bool
|
|
|
|
curT int64
|
|
curV float64
|
|
}
|
|
|
|
func newVerticalMergeSeriesIterator(s ...storage.Series) chunkenc.Iterator {
|
|
if len(s) == 1 {
|
|
return s[0].Iterator()
|
|
} else if len(s) == 2 {
|
|
return &verticalMergeSeriesIterator{
|
|
a: s[0].Iterator(),
|
|
b: s[1].Iterator(),
|
|
}
|
|
}
|
|
return &verticalMergeSeriesIterator{
|
|
a: s[0].Iterator(),
|
|
b: newVerticalMergeSeriesIterator(s[1:]...),
|
|
}
|
|
}
|
|
|
|
func (it *verticalMergeSeriesIterator) Seek(t int64) bool {
|
|
it.aok, it.bok = it.a.Seek(t), it.b.Seek(t)
|
|
it.initialized = true
|
|
return it.Next()
|
|
}
|
|
|
|
func (it *verticalMergeSeriesIterator) Next() bool {
|
|
if !it.initialized {
|
|
it.aok = it.a.Next()
|
|
it.bok = it.b.Next()
|
|
it.initialized = true
|
|
}
|
|
|
|
if !it.aok && !it.bok {
|
|
return false
|
|
}
|
|
|
|
if !it.aok {
|
|
it.curT, it.curV = it.b.At()
|
|
it.bok = it.b.Next()
|
|
return true
|
|
}
|
|
if !it.bok {
|
|
it.curT, it.curV = it.a.At()
|
|
it.aok = it.a.Next()
|
|
return true
|
|
}
|
|
|
|
acurT, acurV := it.a.At()
|
|
bcurT, bcurV := it.b.At()
|
|
if acurT < bcurT {
|
|
it.curT, it.curV = acurT, acurV
|
|
it.aok = it.a.Next()
|
|
} else if acurT > bcurT {
|
|
it.curT, it.curV = bcurT, bcurV
|
|
it.bok = it.b.Next()
|
|
} else {
|
|
it.curT, it.curV = bcurT, bcurV
|
|
it.aok = it.a.Next()
|
|
it.bok = it.b.Next()
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (it *verticalMergeSeriesIterator) At() (t int64, v float64) {
|
|
return it.curT, it.curV
|
|
}
|
|
|
|
func (it *verticalMergeSeriesIterator) Err() error {
|
|
if it.a.Err() != nil {
|
|
return it.a.Err()
|
|
}
|
|
return it.b.Err()
|
|
}
|
|
|
|
// chunkSeriesIterator implements a series iterator on top
|
|
// of a list of time-sorted, non-overlapping chunks.
|
|
type chunkSeriesIterator struct {
|
|
chunks []chunks.Meta
|
|
|
|
i int
|
|
cur chunkenc.Iterator
|
|
bufDelIter *deletedIterator
|
|
|
|
maxt, mint int64
|
|
|
|
intervals tombstones.Intervals
|
|
}
|
|
|
|
func newChunkSeriesIterator(cs []chunks.Meta, dranges tombstones.Intervals, mint, maxt int64) *chunkSeriesIterator {
|
|
csi := &chunkSeriesIterator{
|
|
chunks: cs,
|
|
i: 0,
|
|
|
|
mint: mint,
|
|
maxt: maxt,
|
|
|
|
intervals: dranges,
|
|
}
|
|
csi.resetCurIterator()
|
|
|
|
return csi
|
|
}
|
|
|
|
func (it *chunkSeriesIterator) resetCurIterator() {
|
|
if len(it.intervals) == 0 {
|
|
it.cur = it.chunks[it.i].Chunk.Iterator(it.cur)
|
|
return
|
|
}
|
|
if it.bufDelIter == nil {
|
|
it.bufDelIter = &deletedIterator{
|
|
intervals: it.intervals,
|
|
}
|
|
}
|
|
it.bufDelIter.it = it.chunks[it.i].Chunk.Iterator(it.bufDelIter.it)
|
|
it.cur = it.bufDelIter
|
|
}
|
|
|
|
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
|
|
if t > it.maxt {
|
|
return false
|
|
}
|
|
|
|
// Seek to the first valid value after t.
|
|
if t < it.mint {
|
|
t = it.mint
|
|
}
|
|
|
|
for ; it.chunks[it.i].MaxTime < t; it.i++ {
|
|
if it.i == len(it.chunks)-1 {
|
|
return false
|
|
}
|
|
}
|
|
|
|
it.resetCurIterator()
|
|
|
|
for it.cur.Next() {
|
|
t0, _ := it.cur.At()
|
|
if t0 >= t {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (it *chunkSeriesIterator) At() (t int64, v float64) {
|
|
return it.cur.At()
|
|
}
|
|
|
|
func (it *chunkSeriesIterator) Next() bool {
|
|
if it.cur.Next() {
|
|
t, _ := it.cur.At()
|
|
|
|
if t < it.mint {
|
|
if !it.Seek(it.mint) {
|
|
return false
|
|
}
|
|
t, _ = it.At()
|
|
|
|
return t <= it.maxt
|
|
}
|
|
if t > it.maxt {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
if err := it.cur.Err(); err != nil {
|
|
return false
|
|
}
|
|
if it.i == len(it.chunks)-1 {
|
|
return false
|
|
}
|
|
|
|
it.i++
|
|
it.resetCurIterator()
|
|
|
|
return it.Next()
|
|
}
|
|
|
|
func (it *chunkSeriesIterator) Err() error {
|
|
return it.cur.Err()
|
|
}
|
|
|
|
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
|
|
// returned.
|
|
type deletedIterator struct {
|
|
it chunkenc.Iterator
|
|
|
|
intervals tombstones.Intervals
|
|
}
|
|
|
|
func (it *deletedIterator) At() (int64, float64) {
|
|
return it.it.At()
|
|
}
|
|
|
|
func (it *deletedIterator) Seek(t int64) bool {
|
|
if it.it.Err() != nil {
|
|
return false
|
|
}
|
|
return it.it.Seek(t)
|
|
}
|
|
|
|
func (it *deletedIterator) Next() bool {
|
|
Outer:
|
|
for it.it.Next() {
|
|
ts, _ := it.it.At()
|
|
|
|
for _, tr := range it.intervals {
|
|
if tr.InBounds(ts) {
|
|
continue Outer
|
|
}
|
|
|
|
if ts <= tr.Maxt {
|
|
return true
|
|
|
|
}
|
|
it.intervals = it.intervals[1:]
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (it *deletedIterator) Err() error { return it.it.Err() }
|