mirror of
https://github.com/prometheus/prometheus
synced 2025-01-13 10:22:46 +00:00
be13a4ba7e
Signed-off-by: Callum Styan <callumstyan@gmail.com>
839 lines
22 KiB
Go
839 lines
22 KiB
Go
// Copyright 2017 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"reflect"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
)
|
|
|
|
type fanout struct {
|
|
logger log.Logger
|
|
|
|
primary Storage
|
|
secondaries []Storage
|
|
}
|
|
|
|
// NewFanout returns a new fan-out Storage, which proxies reads and writes
|
|
// through to multiple underlying storages.
|
|
func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Storage {
|
|
return &fanout{
|
|
logger: logger,
|
|
primary: primary,
|
|
secondaries: secondaries,
|
|
}
|
|
}
|
|
|
|
// StartTime implements the Storage interface.
|
|
func (f *fanout) StartTime() (int64, error) {
|
|
// StartTime of a fanout should be the earliest StartTime of all its storages,
|
|
// both primary and secondaries.
|
|
firstTime, err := f.primary.StartTime()
|
|
if err != nil {
|
|
return int64(model.Latest), err
|
|
}
|
|
|
|
for _, storage := range f.secondaries {
|
|
t, err := storage.StartTime()
|
|
if err != nil {
|
|
return int64(model.Latest), err
|
|
}
|
|
if t < firstTime {
|
|
firstTime = t
|
|
}
|
|
}
|
|
return firstTime, nil
|
|
}
|
|
|
|
func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
|
queriers := make([]Querier, 0, 1+len(f.secondaries))
|
|
|
|
// Add primary querier.
|
|
primaryQuerier, err := f.primary.Querier(ctx, mint, maxt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
queriers = append(queriers, primaryQuerier)
|
|
|
|
// Add secondary queriers.
|
|
for _, storage := range f.secondaries {
|
|
querier, err := storage.Querier(ctx, mint, maxt)
|
|
if err != nil {
|
|
for _, q := range queriers {
|
|
// TODO(bwplotka): Log error.
|
|
_ = q.Close()
|
|
}
|
|
return nil, err
|
|
}
|
|
queriers = append(queriers, querier)
|
|
}
|
|
|
|
return NewMergeQuerier(primaryQuerier, queriers, ChainedSeriesMerge), nil
|
|
}
|
|
|
|
func (f *fanout) Appender() Appender {
|
|
primary := f.primary.Appender()
|
|
secondaries := make([]Appender, 0, len(f.secondaries))
|
|
for _, storage := range f.secondaries {
|
|
secondaries = append(secondaries, storage.Appender())
|
|
}
|
|
return &fanoutAppender{
|
|
logger: f.logger,
|
|
primary: primary,
|
|
secondaries: secondaries,
|
|
}
|
|
}
|
|
|
|
// Close closes the storage and all its underlying resources.
|
|
func (f *fanout) Close() error {
|
|
if err := f.primary.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO return multiple errors?
|
|
var lastErr error
|
|
for _, storage := range f.secondaries {
|
|
if err := storage.Close(); err != nil {
|
|
lastErr = err
|
|
}
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
// fanoutAppender implements Appender.
|
|
type fanoutAppender struct {
|
|
logger log.Logger
|
|
|
|
primary Appender
|
|
secondaries []Appender
|
|
}
|
|
|
|
func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
|
ref, err := f.primary.Add(l, t, v)
|
|
if err != nil {
|
|
return ref, err
|
|
}
|
|
|
|
for _, appender := range f.secondaries {
|
|
if _, err := appender.Add(l, t, v); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
return ref, nil
|
|
}
|
|
|
|
func (f *fanoutAppender) AddFast(ref uint64, t int64, v float64) error {
|
|
if err := f.primary.AddFast(ref, t, v); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, appender := range f.secondaries {
|
|
if err := appender.AddFast(ref, t, v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *fanoutAppender) Commit() (err error) {
|
|
err = f.primary.Commit()
|
|
|
|
for _, appender := range f.secondaries {
|
|
if err == nil {
|
|
err = appender.Commit()
|
|
} else {
|
|
if rollbackErr := appender.Rollback(); rollbackErr != nil {
|
|
level.Error(f.logger).Log("msg", "Squashed rollback error on commit", "err", rollbackErr)
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (f *fanoutAppender) Rollback() (err error) {
|
|
err = f.primary.Rollback()
|
|
|
|
for _, appender := range f.secondaries {
|
|
rollbackErr := appender.Rollback()
|
|
if err == nil {
|
|
err = rollbackErr
|
|
} else if rollbackErr != nil {
|
|
level.Error(f.logger).Log("msg", "Squashed rollback error on rollback", "err", rollbackErr)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type mergeGenericQuerier struct {
|
|
mergeFunc genericSeriesMergeFunc
|
|
|
|
primaryQuerier genericQuerier
|
|
queriers []genericQuerier
|
|
failedQueriers map[genericQuerier]struct{}
|
|
setQuerierMap map[genericSeriesSet]genericQuerier
|
|
}
|
|
|
|
// NewMergeQuerier returns a new Querier that merges results of chkQuerierSeries queriers.
|
|
// NewMergeQuerier will return NoopQuerier if no queriers are passed to it
|
|
// and will filter NoopQueriers from its arguments, in order to reduce overhead
|
|
// when only one querier is passed.
|
|
// The difference between primary and secondary is as follows: f the primaryQuerier returns an error, query fails.
|
|
// For secondaries it just return warnings.
|
|
func NewMergeQuerier(primaryQuerier Querier, queriers []Querier, mergeFunc VerticalSeriesMergeFunc) Querier {
|
|
filtered := make([]genericQuerier, 0, len(queriers))
|
|
for _, querier := range queriers {
|
|
if _, ok := querier.(noopQuerier); !ok && querier != nil {
|
|
filtered = append(filtered, newGenericQuerierFrom(querier))
|
|
}
|
|
}
|
|
|
|
if len(filtered) == 0 {
|
|
return primaryQuerier
|
|
}
|
|
|
|
if primaryQuerier == nil && len(filtered) == 1 {
|
|
return &querierAdapter{filtered[0]}
|
|
}
|
|
|
|
return &querierAdapter{&mergeGenericQuerier{
|
|
mergeFunc: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge,
|
|
primaryQuerier: newGenericQuerierFrom(primaryQuerier),
|
|
queriers: filtered,
|
|
failedQueriers: make(map[genericQuerier]struct{}),
|
|
setQuerierMap: make(map[genericSeriesSet]genericQuerier),
|
|
}}
|
|
}
|
|
|
|
// NewMergeChunkQuerier returns a new ChunkQuerier that merges results of chkQuerierSeries chunk queriers.
|
|
// NewMergeChunkQuerier will return NoopChunkQuerier if no chunk queriers are passed to it,
|
|
// and will filter NoopQuerieNoopChunkQuerierrs from its arguments, in order to reduce overhead
|
|
// when only one chunk querier is passed.
|
|
func NewMergeChunkQuerier(primaryQuerier ChunkQuerier, queriers []ChunkQuerier, merger VerticalChunkSeriesMergerFunc) ChunkQuerier {
|
|
filtered := make([]genericQuerier, 0, len(queriers))
|
|
for _, querier := range queriers {
|
|
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
|
|
filtered = append(filtered, newGenericQuerierFromChunk(querier))
|
|
}
|
|
}
|
|
|
|
if len(filtered) == 0 {
|
|
return primaryQuerier
|
|
}
|
|
|
|
if primaryQuerier == nil && len(filtered) == 1 {
|
|
return &chunkQuerierAdapter{filtered[0]}
|
|
}
|
|
|
|
return &chunkQuerierAdapter{&mergeGenericQuerier{
|
|
mergeFunc: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge,
|
|
primaryQuerier: newGenericQuerierFromChunk(primaryQuerier),
|
|
queriers: filtered,
|
|
failedQueriers: make(map[genericQuerier]struct{}),
|
|
setQuerierMap: make(map[genericSeriesSet]genericQuerier),
|
|
}}
|
|
}
|
|
|
|
// Select returns a set of series that matches the given label matchers.
|
|
func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (genericSeriesSet, Warnings, error) {
|
|
if len(q.queriers) == 1 {
|
|
return q.queriers[0].Select(sortSeries, hints, matchers...)
|
|
}
|
|
|
|
var (
|
|
seriesSets = make([]genericSeriesSet, 0, len(q.queriers))
|
|
warnings Warnings
|
|
priErr error
|
|
)
|
|
type queryResult struct {
|
|
qr genericQuerier
|
|
set genericSeriesSet
|
|
wrn Warnings
|
|
selectError error
|
|
}
|
|
queryResultChan := make(chan *queryResult)
|
|
|
|
for _, querier := range q.queriers {
|
|
go func(qr genericQuerier) {
|
|
// We need to sort for NewMergeSeriesSet to work.
|
|
set, wrn, err := qr.Select(true, hints, matchers...)
|
|
queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err}
|
|
}(querier)
|
|
}
|
|
for i := 0; i < len(q.queriers); i++ {
|
|
qryResult := <-queryResultChan
|
|
q.setQuerierMap[qryResult.set] = qryResult.qr
|
|
if qryResult.wrn != nil {
|
|
warnings = append(warnings, qryResult.wrn...)
|
|
}
|
|
if qryResult.selectError != nil {
|
|
q.failedQueriers[qryResult.qr] = struct{}{}
|
|
// If the error source isn't the primary querier, return the error as a warning and continue.
|
|
if !reflect.DeepEqual(qryResult.qr, q.primaryQuerier) {
|
|
warnings = append(warnings, qryResult.selectError)
|
|
} else {
|
|
priErr = qryResult.selectError
|
|
}
|
|
continue
|
|
}
|
|
seriesSets = append(seriesSets, qryResult.set)
|
|
}
|
|
if priErr != nil {
|
|
return nil, nil, priErr
|
|
}
|
|
return newGenericMergeSeriesSet(seriesSets, q, q.mergeFunc), warnings, nil
|
|
}
|
|
|
|
// LabelValues returns all potential values for a label name.
|
|
func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
|
|
var results [][]string
|
|
var warnings Warnings
|
|
for _, querier := range q.queriers {
|
|
values, wrn, err := querier.LabelValues(name)
|
|
if wrn != nil {
|
|
warnings = append(warnings, wrn...)
|
|
}
|
|
if err != nil {
|
|
q.failedQueriers[querier] = struct{}{}
|
|
// If the error source isn't the primary querier, return the error as a warning and continue.
|
|
if querier != q.primaryQuerier {
|
|
warnings = append(warnings, err)
|
|
continue
|
|
} else {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
results = append(results, values)
|
|
}
|
|
return mergeStringSlices(results), warnings, nil
|
|
}
|
|
|
|
func (q *mergeGenericQuerier) IsFailedSet(set genericSeriesSet) bool {
|
|
_, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]]
|
|
return isFailedQuerier
|
|
}
|
|
|
|
func mergeStringSlices(ss [][]string) []string {
|
|
switch len(ss) {
|
|
case 0:
|
|
return nil
|
|
case 1:
|
|
return ss[0]
|
|
case 2:
|
|
return mergeTwoStringSlices(ss[0], ss[1])
|
|
default:
|
|
halfway := len(ss) / 2
|
|
return mergeTwoStringSlices(
|
|
mergeStringSlices(ss[:halfway]),
|
|
mergeStringSlices(ss[halfway:]),
|
|
)
|
|
}
|
|
}
|
|
|
|
func mergeTwoStringSlices(a, b []string) []string {
|
|
i, j := 0, 0
|
|
result := make([]string, 0, len(a)+len(b))
|
|
for i < len(a) && j < len(b) {
|
|
switch strings.Compare(a[i], b[j]) {
|
|
case 0:
|
|
result = append(result, a[i])
|
|
i++
|
|
j++
|
|
case -1:
|
|
result = append(result, a[i])
|
|
i++
|
|
case 1:
|
|
result = append(result, b[j])
|
|
j++
|
|
}
|
|
}
|
|
result = append(result, a[i:]...)
|
|
result = append(result, b[j:]...)
|
|
return result
|
|
}
|
|
|
|
// LabelNames returns all the unique label names present in the block in sorted order.
|
|
func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
|
|
labelNamesMap := make(map[string]struct{})
|
|
var warnings Warnings
|
|
for _, querier := range q.queriers {
|
|
names, wrn, err := querier.LabelNames()
|
|
if wrn != nil {
|
|
warnings = append(warnings, wrn...)
|
|
}
|
|
|
|
if err != nil {
|
|
q.failedQueriers[querier] = struct{}{}
|
|
// If the error source isn't the primaryQuerier querier, return the error as a warning and continue.
|
|
if querier != q.primaryQuerier {
|
|
warnings = append(warnings, err)
|
|
continue
|
|
} else {
|
|
return nil, nil, errors.Wrap(err, "LabelNames() from Querier")
|
|
}
|
|
}
|
|
|
|
for _, name := range names {
|
|
labelNamesMap[name] = struct{}{}
|
|
}
|
|
}
|
|
|
|
labelNames := make([]string, 0, len(labelNamesMap))
|
|
for name := range labelNamesMap {
|
|
labelNames = append(labelNames, name)
|
|
}
|
|
sort.Strings(labelNames)
|
|
|
|
return labelNames, warnings, nil
|
|
}
|
|
|
|
// Close releases the resources of the Querier.
|
|
func (q *mergeGenericQuerier) Close() error {
|
|
var errs tsdb_errors.MultiError
|
|
for _, querier := range q.queriers {
|
|
if err := querier.Close(); err != nil {
|
|
errs.Add(err)
|
|
}
|
|
}
|
|
return errs.Err()
|
|
}
|
|
|
|
// genericMergeSeriesSet implements genericSeriesSet
|
|
type genericMergeSeriesSet struct {
|
|
currentLabels labels.Labels
|
|
mergeFunc genericSeriesMergeFunc
|
|
|
|
heap genericSeriesSetHeap
|
|
sets []genericSeriesSet
|
|
|
|
currentSets []genericSeriesSet
|
|
querier *mergeGenericQuerier
|
|
}
|
|
|
|
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
|
|
// It has to handle time-overlapped series as well.
|
|
type VerticalSeriesMergeFunc func(...Series) Series
|
|
|
|
// VerticalChunkSeriesMergerFunc returns merged chunk series implementation that merges series with same labels together.
|
|
// It has to handle time-overlapped chunk series as well.
|
|
type VerticalChunkSeriesMergerFunc func(...ChunkSeries) ChunkSeries
|
|
|
|
// NewMergeSeriesSet returns a new SeriesSet that merges results of chkQuerierSeries SeriesSets.
|
|
func NewMergeSeriesSet(sets []SeriesSet, merger VerticalSeriesMergeFunc) SeriesSet {
|
|
genericSets := make([]genericSeriesSet, 0, len(sets))
|
|
for _, s := range sets {
|
|
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
|
|
|
|
}
|
|
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, nil, (&seriesMergerAdapter{VerticalSeriesMergeFunc: merger}).Merge)}
|
|
}
|
|
|
|
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges results of chkQuerierSeries ChunkSeriesSets.
|
|
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, merger VerticalChunkSeriesMergerFunc) ChunkSeriesSet {
|
|
genericSets := make([]genericSeriesSet, 0, len(sets))
|
|
for _, s := range sets {
|
|
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
|
|
|
|
}
|
|
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, nil, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge)}
|
|
}
|
|
|
|
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
|
|
// series returned by the chkQuerierSeries series sets when iterating.
|
|
// Each chkQuerierSeries series set must return its series in labels order, otherwise
|
|
// merged series set will be incorrect.
|
|
// Argument 'querier' is optional and can be nil. Pass Querier if you want to retry query in case of failing series set.
|
|
// Overlapped situations are merged using provided mergeFunc.
|
|
func newGenericMergeSeriesSet(sets []genericSeriesSet, querier *mergeGenericQuerier, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
|
|
if len(sets) == 1 {
|
|
return sets[0]
|
|
}
|
|
|
|
// Sets need to be pre-advanced, so we can introspect the label of the
|
|
// series under the cursor.
|
|
var h genericSeriesSetHeap
|
|
for _, set := range sets {
|
|
if set == nil {
|
|
continue
|
|
}
|
|
if set.Next() {
|
|
heap.Push(&h, set)
|
|
}
|
|
}
|
|
return &genericMergeSeriesSet{
|
|
mergeFunc: mergeFunc,
|
|
heap: h,
|
|
sets: sets,
|
|
querier: querier,
|
|
}
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) Next() bool {
|
|
// Run in a loop because the "next" series sets may not be valid anymore.
|
|
// If a remote querier fails, we discard all series sets from that querier.
|
|
// If, for the current label set, all the next series sets come from
|
|
// failed remote storage sources, we want to keep trying with the next label set.
|
|
for {
|
|
// Firstly advance all the current series sets. If any of them have run out
|
|
// we can drop them, otherwise they should be inserted back into the heap.
|
|
for _, set := range c.currentSets {
|
|
if set.Next() {
|
|
heap.Push(&c.heap, set)
|
|
}
|
|
}
|
|
if len(c.heap) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Now, pop items of the heap that have equal label sets.
|
|
c.currentSets = nil
|
|
c.currentLabels = c.heap[0].At().Labels()
|
|
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
|
|
set := heap.Pop(&c.heap).(genericSeriesSet)
|
|
if c.querier != nil && c.querier.IsFailedSet(set) {
|
|
continue
|
|
}
|
|
c.currentSets = append(c.currentSets, set)
|
|
}
|
|
|
|
// As long as the current set contains at least 1 set,
|
|
// then it should return true.
|
|
if len(c.currentSets) != 0 {
|
|
break
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) At() Labels {
|
|
if len(c.currentSets) == 1 {
|
|
return c.currentSets[0].At()
|
|
}
|
|
series := make([]Labels, 0, len(c.currentSets))
|
|
for _, seriesSet := range c.currentSets {
|
|
series = append(series, seriesSet.At())
|
|
}
|
|
return c.mergeFunc(series...)
|
|
}
|
|
|
|
func (c *genericMergeSeriesSet) Err() error {
|
|
for _, set := range c.sets {
|
|
if err := set.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type genericSeriesSetHeap []genericSeriesSet
|
|
|
|
func (h genericSeriesSetHeap) Len() int { return len(h) }
|
|
func (h genericSeriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h genericSeriesSetHeap) Less(i, j int) bool {
|
|
a, b := h[i].At().Labels(), h[j].At().Labels()
|
|
return labels.Compare(a, b) < 0
|
|
}
|
|
|
|
func (h *genericSeriesSetHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(genericSeriesSet))
|
|
}
|
|
|
|
func (h *genericSeriesSetHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|
|
|
|
// ChainedSeriesMerge returns single series from many same series by chaining samples together.
|
|
// In case of the timestamp overlap, the first overlapped sample is kept and the rest samples with the same timestamps
|
|
// are dropped. We expect the same labels for each given series.
|
|
// TODO(bwplotka): This has the same logic as tsdb.verticalChainedSeries. Remove this in favor of ChainedSeriesMerge in next PRs.
|
|
func ChainedSeriesMerge(s ...Series) Series {
|
|
if len(s) == 0 {
|
|
return nil
|
|
}
|
|
return &chainSeries{
|
|
labels: s[0].Labels(),
|
|
series: s,
|
|
}
|
|
}
|
|
|
|
type chainSeries struct {
|
|
labels labels.Labels
|
|
series []Series
|
|
}
|
|
|
|
func (m *chainSeries) Labels() labels.Labels {
|
|
return m.labels
|
|
}
|
|
|
|
func (m *chainSeries) Iterator() chunkenc.Iterator {
|
|
iterators := make([]chunkenc.Iterator, 0, len(m.series))
|
|
for _, s := range m.series {
|
|
iterators = append(iterators, s.Iterator())
|
|
}
|
|
return newChainSampleIterator(iterators)
|
|
}
|
|
|
|
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series.
|
|
// If one or more samples overlap, the first one is kept and all others with the same timestamp are dropped.
|
|
type chainSampleIterator struct {
|
|
iterators []chunkenc.Iterator
|
|
h samplesIteratorHeap
|
|
}
|
|
|
|
func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
|
|
return &chainSampleIterator{
|
|
iterators: iterators,
|
|
h: nil,
|
|
}
|
|
}
|
|
|
|
func (c *chainSampleIterator) Seek(t int64) bool {
|
|
c.h = samplesIteratorHeap{}
|
|
for _, iter := range c.iterators {
|
|
if iter.Seek(t) {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
func (c *chainSampleIterator) At() (t int64, v float64) {
|
|
if len(c.h) == 0 {
|
|
panic("chainSampleIterator.At() called after .Next() returned false.")
|
|
}
|
|
|
|
return c.h[0].At()
|
|
}
|
|
|
|
func (c *chainSampleIterator) Next() bool {
|
|
if c.h == nil {
|
|
for _, iter := range c.iterators {
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
if len(c.h) == 0 {
|
|
return false
|
|
}
|
|
|
|
currt, _ := c.At()
|
|
for len(c.h) > 0 {
|
|
nextt, _ := c.h[0].At()
|
|
// All but one of the overlapping samples will be dropped.
|
|
if nextt != currt {
|
|
break
|
|
}
|
|
|
|
iter := heap.Pop(&c.h).(chunkenc.Iterator)
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
func (c *chainSampleIterator) Err() error {
|
|
for _, iter := range c.iterators {
|
|
if err := iter.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type samplesIteratorHeap []chunkenc.Iterator
|
|
|
|
func (h samplesIteratorHeap) Len() int { return len(h) }
|
|
func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h samplesIteratorHeap) Less(i, j int) bool {
|
|
at, _ := h[i].At()
|
|
bt, _ := h[j].At()
|
|
return at < bt
|
|
}
|
|
|
|
func (h *samplesIteratorHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(chunkenc.Iterator))
|
|
}
|
|
|
|
func (h *samplesIteratorHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|
|
|
|
// VerticalChunkMergeFunc represents a function that merges multiple time overlapping chunks.
|
|
// Passed chunks:
|
|
// * have to be sorted by MinTime.
|
|
// * have to be part of exactly the same timeseries.
|
|
// * have to be populated.
|
|
type VerticalChunksMergeFunc func(chks ...chunks.Meta) chunks.Iterator
|
|
|
|
type verticalChunkSeriesMerger struct {
|
|
verticalChunksMerger VerticalChunksMergeFunc
|
|
|
|
labels labels.Labels
|
|
series []ChunkSeries
|
|
}
|
|
|
|
// NewVerticalChunkSeriesMerger returns VerticalChunkSeriesMerger that merges the same chunk series into one or more chunks.
|
|
// In case of the chunk overlap, given VerticalChunkMergeFunc will be used.
|
|
// It expects the same labels for each given series.
|
|
func NewVerticalChunkSeriesMerger(chunkMerger VerticalChunksMergeFunc) VerticalChunkSeriesMergerFunc {
|
|
return func(s ...ChunkSeries) ChunkSeries {
|
|
if len(s) == 0 {
|
|
return nil
|
|
}
|
|
return &verticalChunkSeriesMerger{
|
|
verticalChunksMerger: chunkMerger,
|
|
labels: s[0].Labels(),
|
|
series: s,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *verticalChunkSeriesMerger) Labels() labels.Labels {
|
|
return s.labels
|
|
}
|
|
|
|
func (s *verticalChunkSeriesMerger) Iterator() chunks.Iterator {
|
|
iterators := make([]chunks.Iterator, 0, len(s.series))
|
|
for _, series := range s.series {
|
|
iterators = append(iterators, series.Iterator())
|
|
}
|
|
return &chainChunkIterator{
|
|
overlappedChunksMerger: s.verticalChunksMerger,
|
|
iterators: iterators,
|
|
h: nil,
|
|
}
|
|
}
|
|
|
|
// chainChunkIterator is responsible to chain chunks from different iterators of same time series.
|
|
// If they are time overlapping overlappedChunksMerger will be used.
|
|
type chainChunkIterator struct {
|
|
overlappedChunksMerger VerticalChunksMergeFunc
|
|
|
|
iterators []chunks.Iterator
|
|
h chunkIteratorHeap
|
|
}
|
|
|
|
func (c *chainChunkIterator) At() chunks.Meta {
|
|
if len(c.h) == 0 {
|
|
panic("chainChunkIterator.At() called after .Next() returned false.")
|
|
}
|
|
|
|
return c.h[0].At()
|
|
}
|
|
|
|
func (c *chainChunkIterator) Next() bool {
|
|
if c.h == nil {
|
|
for _, iter := range c.iterators {
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
}
|
|
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
if len(c.h) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Detect the shortest chain of time-overlapped chunks.
|
|
last := c.At()
|
|
var overlapped []chunks.Meta
|
|
for {
|
|
iter := heap.Pop(&c.h).(chunks.Iterator)
|
|
if iter.Next() {
|
|
heap.Push(&c.h, iter)
|
|
}
|
|
|
|
if len(c.h) == 0 {
|
|
break
|
|
}
|
|
|
|
next := c.At()
|
|
if next.MinTime > last.MaxTime {
|
|
// No overlap with last one.
|
|
break
|
|
}
|
|
overlapped = append(overlapped, last)
|
|
last = next
|
|
}
|
|
if len(overlapped) > 0 {
|
|
heap.Push(&c.h, c.overlappedChunksMerger(append(overlapped, c.At())...))
|
|
return true
|
|
}
|
|
return len(c.h) > 0
|
|
}
|
|
|
|
func (c *chainChunkIterator) Err() error {
|
|
for _, iter := range c.iterators {
|
|
if err := iter.Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type chunkIteratorHeap []chunks.Iterator
|
|
|
|
func (h chunkIteratorHeap) Len() int { return len(h) }
|
|
func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h chunkIteratorHeap) Less(i, j int) bool {
|
|
at := h[i].At()
|
|
bt := h[j].At()
|
|
if at.MinTime == bt.MinTime {
|
|
return at.MaxTime < bt.MaxTime
|
|
}
|
|
return at.MinTime < bt.MinTime
|
|
}
|
|
|
|
func (h *chunkIteratorHeap) Push(x interface{}) {
|
|
*h = append(*h, x.(chunks.Iterator))
|
|
}
|
|
|
|
func (h *chunkIteratorHeap) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|