mirror of
https://github.com/prometheus/prometheus
synced 2024-12-25 16:02:28 +00:00
38d32e0686
Sorting the heads postings can be quite slow. We only need sorted series when merging with another querier, so only sort then. This will make big queries that only touch the head faster, though queries that touch both the head and a block will still be the same speed. This probably won't help much with graphing unless the range is under an hour, however it should make most recording rules faster. Add gaurantee that remote read streaming produces sorted series. PromQL benchmarks for histograms show only 2-3% improvement, but they're only over 1k series. benchmark old ns/op new ns/op delta BenchmarkQuerierSelect/Head/1of1000000-4 1375486282 507657736 -63.09% BenchmarkQuerierSelect/Head/10of1000000-4 1387859004 507769850 -63.41% BenchmarkQuerierSelect/Head/100of1000000-4 1387087935 506029110 -63.52% BenchmarkQuerierSelect/Head/1000of1000000-4 1386869064 504521986 -63.62% BenchmarkQuerierSelect/Head/10000of1000000-4 1386213685 505210422 -63.55% BenchmarkQuerierSelect/Head/100000of1000000-4 1392754988 529842406 -61.96% BenchmarkQuerierSelect/Head/1000000of1000000-4 1569414722 725059506 -53.80% BenchmarkQuerierSelect/SortedHead/1of1000000-4 1381019902 1370495863 -0.76% BenchmarkQuerierSelect/SortedHead/10of1000000-4 1375696209 1366789468 -0.65% BenchmarkQuerierSelect/SortedHead/100of1000000-4 1386009422 1364519297 -1.55% BenchmarkQuerierSelect/SortedHead/1000of1000000-4 1377700532 1364486191 -0.96% BenchmarkQuerierSelect/SortedHead/10000of1000000-4 1383539536 1369545314 -1.01% BenchmarkQuerierSelect/SortedHead/100000of1000000-4 1410089163 1394731339 -1.09% BenchmarkQuerierSelect/SortedHead/1000000of1000000-4 1634744148 1581554956 -3.25% BenchmarkQuerierSelect/Block/1of1000000-4 881741242 879839470 -0.22% BenchmarkQuerierSelect/Block/10of1000000-4 880381562 882846038 +0.28% BenchmarkQuerierSelect/Block/100of1000000-4 887519357 881016916 -0.73% BenchmarkQuerierSelect/Block/1000of1000000-4 902194205 883433524 -2.08% BenchmarkQuerierSelect/Block/10000of1000000-4 892321964 885130170 -0.81% BenchmarkQuerierSelect/Block/100000of1000000-4 938604466 933527150 -0.54% BenchmarkQuerierSelect/Block/1000000of1000000-4 1313510845 1295881124 -1.34% benchmark old allocs new allocs delta BenchmarkQuerierSelect/Head/1of1000000-4 4000056 4000018 -0.00% BenchmarkQuerierSelect/Head/10of1000000-4 4000074 4000036 -0.00% BenchmarkQuerierSelect/Head/100of1000000-4 4000254 4000216 -0.00% BenchmarkQuerierSelect/Head/1000of1000000-4 4002054 4002016 -0.00% BenchmarkQuerierSelect/Head/10000of1000000-4 4020054 4020016 -0.00% BenchmarkQuerierSelect/Head/100000of1000000-4 4200054 4200016 -0.00% BenchmarkQuerierSelect/Head/1000000of1000000-4 6000054 6000016 -0.00% BenchmarkQuerierSelect/SortedHead/1of1000000-4 4000071 4000071 +0.00% BenchmarkQuerierSelect/SortedHead/10of1000000-4 4000089 4000089 +0.00% BenchmarkQuerierSelect/SortedHead/100of1000000-4 4000269 4000269 +0.00% BenchmarkQuerierSelect/SortedHead/1000of1000000-4 4002069 4002069 +0.00% BenchmarkQuerierSelect/SortedHead/10000of1000000-4 4020069 4020069 +0.00% BenchmarkQuerierSelect/SortedHead/100000of1000000-4 4200069 4200069 +0.00% BenchmarkQuerierSelect/SortedHead/1000000of1000000-4 6000069 6000069 +0.00% BenchmarkQuerierSelect/Block/1of1000000-4 6000023 6000022 -0.00% BenchmarkQuerierSelect/Block/10of1000000-4 6000059 6000058 -0.00% BenchmarkQuerierSelect/Block/100of1000000-4 6000419 6000418 -0.00% BenchmarkQuerierSelect/Block/1000of1000000-4 6004019 6004018 -0.00% BenchmarkQuerierSelect/Block/10000of1000000-4 6040019 6040018 -0.00% BenchmarkQuerierSelect/Block/100000of1000000-4 6400019 6400018 -0.00% BenchmarkQuerierSelect/Block/1000000of1000000-4 10000020 10000019 -0.00% benchmark old bytes new bytes delta BenchmarkQuerierSelect/Head/1of1000000-4 229192200 176001176 -23.21% BenchmarkQuerierSelect/Head/10of1000000-4 229193352 176002328 -23.21% BenchmarkQuerierSelect/Head/100of1000000-4 229204872 176013848 -23.21% BenchmarkQuerierSelect/Head/1000of1000000-4 229320072 176129048 -23.20% BenchmarkQuerierSelect/Head/10000of1000000-4 230472072 177281048 -23.08% BenchmarkQuerierSelect/Head/100000of1000000-4 241992072 188801048 -21.98% BenchmarkQuerierSelect/Head/1000000of1000000-4 357192072 304001048 -14.89% BenchmarkQuerierSelect/SortedHead/1of1000000-4 229193928 229193928 +0.00% BenchmarkQuerierSelect/SortedHead/10of1000000-4 229195080 229195080 +0.00% BenchmarkQuerierSelect/SortedHead/100of1000000-4 229206600 229206600 +0.00% BenchmarkQuerierSelect/SortedHead/1000of1000000-4 229321800 229321800 +0.00% BenchmarkQuerierSelect/SortedHead/10000of1000000-4 230473800 230473800 +0.00% BenchmarkQuerierSelect/SortedHead/100000of1000000-4 241993800 241993800 +0.00% BenchmarkQuerierSelect/SortedHead/1000000of1000000-4 357193800 357193800 +0.00% BenchmarkQuerierSelect/Block/1of1000000-4 227201516 227201500 -0.00% BenchmarkQuerierSelect/Block/10of1000000-4 227202924 227202908 -0.00% BenchmarkQuerierSelect/Block/100of1000000-4 227217036 227217020 -0.00% BenchmarkQuerierSelect/Block/1000of1000000-4 227358156 227358140 -0.00% BenchmarkQuerierSelect/Block/10000of1000000-4 228769356 228769340 -0.00% BenchmarkQuerierSelect/Block/100000of1000000-4 242881356 242881340 -0.00% BenchmarkQuerierSelect/Block/1000000of1000000-4 384001616 384001600 -0.00% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
605 lines
14 KiB
Go
605 lines
14 KiB
Go
// Copyright 2017 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
)
|
|
|
|
type fanout struct {
|
|
logger log.Logger
|
|
|
|
primary Storage
|
|
secondaries []Storage
|
|
}
|
|
|
|
// NewFanout returns a new fan-out Storage, which proxies reads and writes
|
|
// through to multiple underlying storages.
|
|
func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Storage {
|
|
return &fanout{
|
|
logger: logger,
|
|
primary: primary,
|
|
secondaries: secondaries,
|
|
}
|
|
}
|
|
|
|
// StartTime implements the Storage interface.
|
|
func (f *fanout) StartTime() (int64, error) {
|
|
// StartTime of a fanout should be the earliest StartTime of all its storages,
|
|
// both primary and secondaries.
|
|
firstTime, err := f.primary.StartTime()
|
|
if err != nil {
|
|
return int64(model.Latest), err
|
|
}
|
|
|
|
for _, storage := range f.secondaries {
|
|
t, err := storage.StartTime()
|
|
if err != nil {
|
|
return int64(model.Latest), err
|
|
}
|
|
if t < firstTime {
|
|
firstTime = t
|
|
}
|
|
}
|
|
return firstTime, nil
|
|
}
|
|
|
|
func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
|
queriers := make([]Querier, 0, 1+len(f.secondaries))
|
|
|
|
// Add primary querier
|
|
primaryQuerier, err := f.primary.Querier(ctx, mint, maxt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
queriers = append(queriers, primaryQuerier)
|
|
|
|
// Add secondary queriers
|
|
for _, storage := range f.secondaries {
|
|
querier, err := storage.Querier(ctx, mint, maxt)
|
|
if err != nil {
|
|
NewMergeQuerier(primaryQuerier, queriers).Close()
|
|
return nil, err
|
|
}
|
|
queriers = append(queriers, querier)
|
|
}
|
|
|
|
return NewMergeQuerier(primaryQuerier, queriers), nil
|
|
}
|
|
|
|
func (f *fanout) Appender() (Appender, error) {
|
|
primary, err := f.primary.Appender()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
secondaries := make([]Appender, 0, len(f.secondaries))
|
|
for _, storage := range f.secondaries {
|
|
appender, err := storage.Appender()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
secondaries = append(secondaries, appender)
|
|
}
|
|
return &fanoutAppender{
|
|
logger: f.logger,
|
|
primary: primary,
|
|
secondaries: secondaries,
|
|
}, nil
|
|
}
|
|
|
|
// Close closes the storage and all its underlying resources.
|
|
func (f *fanout) Close() error {
|
|
if err := f.primary.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO return multiple errors?
|
|
var lastErr error
|
|
for _, storage := range f.secondaries {
|
|
if err := storage.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
// fanoutAppender implements Appender.
|
|
type fanoutAppender struct {
|
|
logger log.Logger
|
|
|
|
primary Appender
|
|
secondaries []Appender
|
|
}
|
|
|
|
func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
|
ref, err := f.primary.Add(l, t, v)
|
|
if err != nil {
|
|
return ref, err
|
|
}
|
|
|
|
for _, appender := range f.secondaries {
|
|
if _, err := appender.Add(l, t, v); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
return ref, nil
|
|
}
|
|
|
|
func (f *fanoutAppender) AddFast(l labels.Labels, ref uint64, t int64, v float64) error {
|
|
if err := f.primary.AddFast(l, ref, t, v); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, appender := range f.secondaries {
|
|
if _, err := appender.Add(l, t, v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *fanoutAppender) Commit() (err error) {
|
|
err = f.primary.Commit()
|
|
|
|
for _, appender := range f.secondaries {
|
|
if err == nil {
|
|
err = appender.Commit()
|
|
} else {
|
|
if rollbackErr := appender.Rollback(); rollbackErr != nil {
|
|
level.Error(f.logger).Log("msg", "Squashed rollback error on commit", "err", rollbackErr)
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (f *fanoutAppender) Rollback() (err error) {
|
|
err = f.primary.Rollback()
|
|
|
|
for _, appender := range f.secondaries {
|
|
rollbackErr := appender.Rollback()
|
|
if err == nil {
|
|
err = rollbackErr
|
|
} else if rollbackErr != nil {
|
|
level.Error(f.logger).Log("msg", "Squashed rollback error on rollback", "err", rollbackErr)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// mergeQuerier implements Querier.
|
|
type mergeQuerier struct {
|
|
primaryQuerier Querier
|
|
queriers []Querier
|
|
|
|
failedQueriers map[Querier]struct{}
|
|
setQuerierMap map[SeriesSet]Querier
|
|
}
|
|
|
|
// NewMergeQuerier returns a new Querier that merges results of input queriers.
|
|
// NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it,
|
|
// and will filter NoopQueriers from its arguments, in order to reduce overhead
|
|
// when only one querier is passed.
|
|
func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
|
|
filtered := make([]Querier, 0, len(queriers))
|
|
for _, querier := range queriers {
|
|
if querier != NoopQuerier() {
|
|
filtered = append(filtered, querier)
|
|
}
|
|
}
|
|
|
|
setQuerierMap := make(map[SeriesSet]Querier)
|
|
failedQueriers := make(map[Querier]struct{})
|
|
|
|
switch len(filtered) {
|
|
case 0:
|
|
return NoopQuerier()
|
|
case 1:
|
|
return filtered[0]
|
|
default:
|
|
return &mergeQuerier{
|
|
primaryQuerier: primaryQuerier,
|
|
queriers: filtered,
|
|
failedQueriers: failedQueriers,
|
|
setQuerierMap: setQuerierMap,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Select returns a set of series that matches the given label matchers.
|
|
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
|
if len(q.queriers) != 1 {
|
|
// We need to sort for NewMergeSeriesSet to work.
|
|
return q.SelectSorted(params, matchers...)
|
|
}
|
|
return q.queriers[0].Select(params, matchers...)
|
|
}
|
|
|
|
// SelectSorted returns a set of sorted series that matches the given label matchers.
|
|
func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
|
seriesSets := make([]SeriesSet, 0, len(q.queriers))
|
|
var warnings Warnings
|
|
for _, querier := range q.queriers {
|
|
set, wrn, err := querier.SelectSorted(params, matchers...)
|
|
q.setQuerierMap[set] = querier
|
|
if wrn != nil {
|
|
warnings = append(warnings, wrn...)
|
|
}
|
|
if err != nil {
|
|
q.failedQueriers[querier] = struct{}{}
|
|
// If the error source isn't the primary querier, return the error as a warning and continue.
|
|
if querier != q.primaryQuerier {
|
|
warnings = append(warnings, err)
|
|
continue
|
|
} else {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
seriesSets = append(seriesSets, set)
|
|
}
|
|
return NewMergeSeriesSet(seriesSets, q), warnings, nil
|
|
}
|
|
|
|
// LabelValues returns all potential values for a label name.
|
|
func (q *mergeQuerier) LabelValues(name string) ([]string, Warnings, error) {
|
|
var results [][]string
|
|
var warnings Warnings
|
|
for _, querier := range q.queriers {
|
|
values, wrn, err := querier.LabelValues(name)
|
|
|
|
if wrn != nil {
|
|
warnings = append(warnings, wrn...)
|
|
}
|
|
if err != nil {
|
|
q.failedQueriers[querier] = struct{}{}
|
|
// If the error source isn't the primary querier, return the error as a warning and continue.
|
|
if querier != q.primaryQuerier {
|
|
warnings = append(warnings, err)
|
|
continue
|
|
} else {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
results = append(results, values)
|
|
}
|
|
return mergeStringSlices(results), warnings, nil
|
|
}
|
|
|
|
func (q *mergeQuerier) IsFailedSet(set SeriesSet) bool {
|
|
_, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]]
|
|
return isFailedQuerier
|
|
}
|
|
|
|
func mergeStringSlices(ss [][]string) []string {
|
|
switch len(ss) {
|
|
case 0:
|
|
return nil
|
|
case 1:
|
|
return ss[0]
|
|
case 2:
|
|
return mergeTwoStringSlices(ss[0], ss[1])
|
|
default:
|
|
halfway := len(ss) / 2
|
|
return mergeTwoStringSlices(
|
|
mergeStringSlices(ss[:halfway]),
|
|
mergeStringSlices(ss[halfway:]),
|
|
)
|
|
}
|
|
}
|
|
|
|
func mergeTwoStringSlices(a, b []string) []string {
|
|
i, j := 0, 0
|
|
result := make([]string, 0, len(a)+len(b))
|
|
for i < len(a) && j < len(b) {
|
|
switch strings.Compare(a[i], b[j]) {
|
|
case 0:
|
|
result = append(result, a[i])
|
|
i++
|
|
j++
|
|
case -1:
|
|
result = append(result, a[i])
|
|
i++
|
|
case 1:
|
|
result = append(result, b[j])
|
|
j++
|
|
}
|
|
}
|
|
result = append(result, a[i:]...)
|
|
result = append(result, b[j:]...)
|
|
return result
|
|
}
|
|
|
|
// LabelNames returns all the unique label names present in the block in sorted order.
|
|
func (q *mergeQuerier) LabelNames() ([]string, Warnings, error) {
|
|
labelNamesMap := make(map[string]struct{})
|
|
var warnings Warnings
|
|
for _, b := range q.queriers {
|
|
names, wrn, err := b.LabelNames()
|
|
if wrn != nil {
|
|
warnings = append(warnings, wrn...)
|
|
}
|
|
|
|
if err != nil {
|
|
// If the error source isn't the primary querier, return the error as a warning and continue.
|
|
if b != q.primaryQuerier {
|
|
warnings = append(warnings, err)
|
|
continue
|
|
} else {
|
|
return nil, nil, errors.Wrap(err, "LabelNames() from Querier")
|
|
}
|
|
}
|
|
|
|
for _, name := range names {
|
|
labelNamesMap[name] = struct{}{}
|
|
}
|
|
}
|
|
|
|
labelNames := make([]string, 0, len(labelNamesMap))
|
|
for name := range labelNamesMap {
|
|
labelNames = append(labelNames, name)
|
|
}
|
|
sort.Strings(labelNames)
|
|
|
|
return labelNames, warnings, nil
|
|
}
|
|
|
|
// Close releases the resources of the Querier.
|
|
func (q *mergeQuerier) Close() error {
|
|
// TODO return multiple errors?
|
|
var lastErr error
|
|
for _, querier := range q.queriers {
|
|
if err := querier.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
// mergeSeriesSet implements SeriesSet
|
|
type mergeSeriesSet struct {
|
|
currentLabels labels.Labels
|
|
currentSets []SeriesSet
|
|
heap seriesSetHeap
|
|
sets []SeriesSet
|
|
|
|
querier *mergeQuerier
|
|
}
|
|
|
|
// NewMergeSeriesSet returns a new series set that merges (deduplicates)
|
|
// series returned by the input series sets when iterating.
|
|
func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet {
|
|
if len(sets) == 1 {
|
|
return sets[0]
|
|
}
|
|
|
|
// Sets need to be pre-advanced, so we can introspect the label of the
|
|
// series under the cursor.
|
|
var h seriesSetHeap
|
|
for _, set := range sets {
|
|
if set == nil {
|
|
continue
|
|
}
|
|
if set.Next() {
|
|
heap.Push(&h, set)
|
|
}
|
|
}
|
|
return &mergeSeriesSet{
|
|
heap: h,
|
|
sets: sets,
|
|
querier: querier,
|
|
}
|
|
}
|
|
|
|
func (c *mergeSeriesSet) Next() bool {
|
|
// Run in a loop because the "next" series sets may not be valid anymore.
|
|
// If a remote querier fails, we discard all series sets from that querier.
|
|
// If, for the current label set, all the next series sets come from
|
|
// failed remote storage sources, we want to keep trying with the next label set.
|
|
for {
|
|
// Firstly advance all the current series sets. If any of them have run out
|
|
// we can drop them, otherwise they should be inserted back into the heap.
|
|
for _, set := range c.currentSets {
|
|
if set.Next() {
|
|
heap.Push(&c.heap, set)
|
|
}
|
|
}
|
|
if len(c.heap) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Now, pop items of the heap that have equal label sets.
|
|
c.currentSets = nil
|
|
c.currentLabels = c.heap[0].At().Labels()
|
|
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
|
|
set := heap.Pop(&c.heap).(SeriesSet)
|
|
if c.querier != nil && c.querier.IsFailedSet(set) {
|
|
continue
|
|
}
|
|
c.currentSets = append(c.currentSets, set)
|
|
}
|
|
|
|
// As long as the current set contains at least 1 set,
|
|
// then it should return true.
|
|
if len(c.currentSets) != 0 {
|
|
break
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *mergeSeriesSet) At() Series {
|
|
if len(c.currentSets) == 1 {
|
|
return c.currentSets[0].At()
|
|
}
|
|
series := []Series{}
|
|
for _, seriesSet := range c.currentSets {
|
|
series = append(series, seriesSet.At())
|
|
}
|
|
return &mergeSeries{
|
|
labels: c.currentLabels,
|
|
series: series,
|
|
}
|
|
}
|
|
|
|
func (c *mergeSeriesSet) Err() error {
|
|
for _, set := range c.sets {
|
|
if err := set.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type seriesSetHeap []SeriesSet
|
|
|
|
func (h seriesSetHeap) Len() int { return len(h) }
|
|
func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h seriesSetHeap) Less(i, j int) bool {
|
|
a, b := h[i].At().Labels(), h[j].At().Labels()
|
|
return labels.Compare(a, b) < 0
|
|
}
|
|
|
|
func (h *seriesSetHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(SeriesSet))
|
|
}
|
|
|
|
func (h *seriesSetHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|
|
|
|
type mergeSeries struct {
|
|
labels labels.Labels
|
|
series []Series
|
|
}
|
|
|
|
func (m *mergeSeries) Labels() labels.Labels {
|
|
return m.labels
|
|
}
|
|
|
|
func (m *mergeSeries) Iterator() SeriesIterator {
|
|
iterators := make([]SeriesIterator, 0, len(m.series))
|
|
for _, s := range m.series {
|
|
iterators = append(iterators, s.Iterator())
|
|
}
|
|
return newMergeIterator(iterators)
|
|
}
|
|
|
|
type mergeIterator struct {
|
|
iterators []SeriesIterator
|
|
h seriesIteratorHeap
|
|
}
|
|
|
|
func newMergeIterator(iterators []SeriesIterator) SeriesIterator {
|
|
return &mergeIterator{
|
|
iterators: iterators,
|
|
h: nil,
|
|
}
|
|
}
|
|
|
|
func (c *mergeIterator) Seek(t int64) bool {
|
|
c.h = seriesIteratorHeap{}
|
|
for _, iter := range c.iterators {
|
|
if iter.Seek(t) {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
func (c *mergeIterator) At() (t int64, v float64) {
|
|
if len(c.h) == 0 {
|
|
panic("mergeIterator.At() called after .Next() returned false.")
|
|
}
|
|
|
|
return c.h[0].At()
|
|
}
|
|
|
|
func (c *mergeIterator) Next() bool {
|
|
if c.h == nil {
|
|
for _, iter := range c.iterators {
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
if len(c.h) == 0 {
|
|
return false
|
|
}
|
|
|
|
currt, _ := c.At()
|
|
for len(c.h) > 0 {
|
|
nextt, _ := c.h[0].At()
|
|
if nextt != currt {
|
|
break
|
|
}
|
|
|
|
iter := heap.Pop(&c.h).(SeriesIterator)
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
func (c *mergeIterator) Err() error {
|
|
for _, iter := range c.iterators {
|
|
if err := iter.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type seriesIteratorHeap []SeriesIterator
|
|
|
|
func (h seriesIteratorHeap) Len() int { return len(h) }
|
|
func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h seriesIteratorHeap) Less(i, j int) bool {
|
|
at, _ := h[i].At()
|
|
bt, _ := h[j].At()
|
|
return at < bt
|
|
}
|
|
|
|
func (h *seriesIteratorHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(SeriesIterator))
|
|
}
|
|
|
|
func (h *seriesIteratorHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|