Fast path the merge querier (#3358)

* Fast path the merge querier such that it is completely removed from query path when there is no remote storage.

* Add NoopQuerier

* Add copyright notice.

* Avoid global, use a function.
This commit is contained in:
Tom Wilkie 2017-10-27 12:29:05 +01:00 committed by Fabian Reinartz
parent 323556b025
commit 48a7a00a38
2 changed files with 79 additions and 9 deletions

View File

@ -63,27 +63,26 @@ func (f *fanout) StartTime() (int64, error) {
} }
func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
queriers := mergeQuerier{ queriers := make([]Querier, 0, 1+len(f.secondaries))
queriers: make([]Querier, 0, 1+len(f.secondaries)),
}
// Add primary querier // Add primary querier
querier, err := f.primary.Querier(ctx, mint, maxt) querier, err := f.primary.Querier(ctx, mint, maxt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
queriers.queriers = append(queriers.queriers, querier) queriers = append(queriers, querier)
// Add secondary queriers // Add secondary queriers
for _, storage := range f.secondaries { for _, storage := range f.secondaries {
querier, err := storage.Querier(ctx, mint, maxt) querier, err := storage.Querier(ctx, mint, maxt)
if err != nil { if err != nil {
queriers.Close() NewMergeQuerier(queriers).Close()
return nil, err return nil, err
} }
queriers.queriers = append(queriers.queriers, querier) queriers = append(queriers, querier)
} }
return &queriers, nil
return NewMergeQuerier(queriers), nil
} }
func (f *fanout) Appender() (Appender, error) { func (f *fanout) Appender() (Appender, error) {
@ -193,9 +192,26 @@ type mergeQuerier struct {
} }
// NewMergeQuerier returns a new Querier that merges results of input queriers. // NewMergeQuerier returns a new Querier that merges results of input queriers.
// NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it,
// and will filter NoopQueriers from its arguments, in order to reduce overhead
// when only one querier is passed.
func NewMergeQuerier(queriers []Querier) Querier { func NewMergeQuerier(queriers []Querier) Querier {
return &mergeQuerier{ filtered := make([]Querier, 0, len(queriers))
queriers: queriers, for _, querier := range queriers {
if querier != NoopQuerier() {
filtered = append(filtered, querier)
}
}
switch len(filtered) {
case 0:
return NoopQuerier()
case 1:
return filtered[0]
default:
return &mergeQuerier{
queriers: filtered,
}
} }
} }

54
storage/noop.go Normal file
View File

@ -0,0 +1,54 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import "github.com/prometheus/prometheus/pkg/labels"
type noopQuerier struct{}
// NoopQuerier is a Querier that does nothing.
func NoopQuerier() Querier {
return noopQuerier{}
}
func (noopQuerier) Select(...*labels.Matcher) SeriesSet {
return NoopSeriesSet()
}
func (noopQuerier) LabelValues(name string) ([]string, error) {
return nil, nil
}
func (noopQuerier) Close() error {
return nil
}
type noopSeriesSet struct{}
// NoopSeriesSet is a SeriesSet that does nothing.
func NoopSeriesSet() SeriesSet {
return noopSeriesSet{}
}
func (noopSeriesSet) Next() bool {
return false
}
func (noopSeriesSet) At() Series {
return nil
}
func (noopSeriesSet) Err() error {
return nil
}