prometheus/storage/fanin/fanin.go

244 lines
6.8 KiB
Go

// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fanin
import (
"sort"
"time"
"golang.org/x/net/context"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/remote"
)
type contextKey string
const ctxLocalOnly contextKey = "local-only"
// WithLocalOnly decorates a context to indicate that a query should
// only be executed against local data.
func WithLocalOnly(ctx context.Context) context.Context {
return context.WithValue(ctx, ctxLocalOnly, struct{}{})
}
func localOnly(ctx context.Context) bool {
return ctx.Value(ctxLocalOnly) == struct{}{}
}
// Queryable is a local.Queryable that reads from local and remote storage.
type Queryable struct {
Local promql.Queryable
Remote *remote.Reader
}
// Querier implements local.Queryable.
func (q Queryable) Querier() (local.Querier, error) {
localQuerier, err := q.Local.Querier()
if err != nil {
return nil, err
}
fq := querier{
local: localQuerier,
remotes: q.Remote.Queriers(),
}
return fq, nil
}
type querier struct {
local local.Querier
remotes []local.Querier
}
func (q querier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return q.query(ctx, func(q local.Querier) ([]local.SeriesIterator, error) {
return q.QueryRange(ctx, from, through, matchers...)
})
}
func (q querier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return q.query(ctx, func(q local.Querier) ([]local.SeriesIterator, error) {
return q.QueryInstant(ctx, ts, stalenessDelta, matchers...)
})
}
func (q querier) query(ctx context.Context, qFn func(q local.Querier) ([]local.SeriesIterator, error)) ([]local.SeriesIterator, error) {
localIts, err := qFn(q.local)
if err != nil {
return nil, err
}
if len(q.remotes) == 0 || localOnly(ctx) {
return localIts, nil
}
fpToIt := map[model.Fingerprint]*mergeIterator{}
for _, it := range localIts {
fp := it.Metric().Metric.Fingerprint()
fpToIt[fp] = &mergeIterator{local: it}
}
for _, q := range q.remotes {
its, err := qFn(q)
if err != nil {
return nil, err
}
mergeIterators(fpToIt, its)
}
its := make([]local.SeriesIterator, 0, len(fpToIt))
for _, it := range fpToIt {
its = append(its, it)
}
return its, nil
}
func (q querier) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) {
return q.local.MetricsForLabelMatchers(ctx, from, through, matcherSets...)
}
func (q querier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
return q.local.LastSampleForLabelMatchers(ctx, cutoff, matcherSets...)
}
func (q querier) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) {
return q.local.LabelValuesForLabelName(ctx, ln)
}
func (q querier) Close() error {
if q.local != nil {
if err := q.local.Close(); err != nil {
return err
}
}
for _, q := range q.remotes {
if err := q.Close(); err != nil {
return err
}
}
return nil
}
// mergeIterator is a SeriesIterator which merges query results for local and remote
// SeriesIterators. If a series has samples in a local iterator, remote samples are
// only considered before the first local sample of a series. This is to avoid things
// like downsampling on the side of the remote storage to interfere with rate(),
// irate(), etc.
type mergeIterator struct {
local local.SeriesIterator
remote []local.SeriesIterator
}
func (mit mergeIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
latest := model.ZeroSamplePair
if mit.local != nil {
latest = mit.local.ValueAtOrBeforeTime(t)
}
// We only need to look for a remote last sample if we don't have a local one
// at all. If we have a local one, by definition we would not care about earlier
// "last" samples, and we would not consider later ones as well, because we
// generally only consider remote samples that are older than the oldest
// local sample.
if latest == model.ZeroSamplePair {
for _, it := range mit.remote {
v := it.ValueAtOrBeforeTime(t)
if v.Timestamp.After(latest.Timestamp) {
latest = v
}
}
}
return latest
}
func (mit mergeIterator) RangeValues(interval metric.Interval) []model.SamplePair {
remoteCutoff := model.Latest
var values []model.SamplePair
if mit.local != nil {
values = mit.local.RangeValues(interval)
if len(values) > 0 {
remoteCutoff = values[0].Timestamp
}
}
for _, it := range mit.remote {
vs := it.RangeValues(interval)
n := sort.Search(len(vs), func(i int) bool {
return !vs[i].Timestamp.Before(remoteCutoff)
})
values = mergeSamples(values, vs[:n])
}
return values
}
func (mit mergeIterator) Metric() metric.Metric {
if mit.local != nil {
return mit.local.Metric()
}
// If there is no local iterator, there has to be at least one remote one in
// order for this iterator to have been created.
return mit.remote[0].Metric()
}
func (mit mergeIterator) Close() {
if mit.local != nil {
mit.local.Close()
}
for _, it := range mit.remote {
it.Close()
}
}
func mergeIterators(fpToIt map[model.Fingerprint]*mergeIterator, its []local.SeriesIterator) {
for _, it := range its {
fp := it.Metric().Metric.Fingerprint()
if fpIts, ok := fpToIt[fp]; !ok {
fpToIt[fp] = &mergeIterator{remote: []local.SeriesIterator{it}}
} else {
fpToIt[fp].remote = append(fpIts.remote, it)
}
}
}
// mergeSamples merges two lists of sample pairs and removes duplicate
// timestamps. It assumes that both lists are sorted by timestamp.
func mergeSamples(a, b []model.SamplePair) []model.SamplePair {
result := make([]model.SamplePair, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].Timestamp < b[j].Timestamp {
result = append(result, a[i])
i++
} else if a[i].Timestamp > b[j].Timestamp {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
i++
j++
}
}
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
}