Fixed iterator regression: Avoid using heap for each sample when iterating. (#7900)
* Fixed iterator regression: Avoid using heap for each sample when iterating. Fixes: https://github.com/prometheus/prometheus/issues/7873 Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Addressed comments. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Check for .At() called after .Next() returned false Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * More comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> Co-authored-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
parent
01daddeb75
commit
088fcc9e48
|
@ -16,6 +16,7 @@ package storage
|
|||
import (
|
||||
"bytes"
|
||||
"container/heap"
|
||||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -418,8 +419,7 @@ func (h *genericSeriesSetHeap) Pop() interface{} {
|
|||
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
|
||||
// this never happens.
|
||||
//
|
||||
// NOTE: Use this merge function only when you see potentially overlapping series, as this introduces a small overhead
|
||||
// to handle overlaps between series.
|
||||
// It's optimized for non-overlap cases as well.
|
||||
func ChainedSeriesMerge(series ...Series) Series {
|
||||
if len(series) == 0 {
|
||||
return nil
|
||||
|
@ -438,16 +438,20 @@ func ChainedSeriesMerge(series ...Series) Series {
|
|||
|
||||
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
|
||||
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
|
||||
// timestamp are dropped.
|
||||
// timestamp are dropped. It's optimized for non-overlap cases as well.
|
||||
type chainSampleIterator struct {
|
||||
iterators []chunkenc.Iterator
|
||||
h samplesIteratorHeap
|
||||
|
||||
curr chunkenc.Iterator
|
||||
lastt int64
|
||||
}
|
||||
|
||||
func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
|
||||
return &chainSampleIterator{
|
||||
iterators: iterators,
|
||||
h: nil,
|
||||
lastt: math.MinInt64,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -458,47 +462,74 @@ func (c *chainSampleIterator) Seek(t int64) bool {
|
|||
heap.Push(&c.h, iter)
|
||||
}
|
||||
}
|
||||
return len(c.h) > 0
|
||||
if len(c.h) > 0 {
|
||||
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
||||
return true
|
||||
}
|
||||
c.curr = nil
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *chainSampleIterator) At() (t int64, v float64) {
|
||||
if len(c.h) == 0 {
|
||||
panic("chainSampleIterator.At() called after .Next() returned false.")
|
||||
if c.curr == nil {
|
||||
panic("chainSampleIterator.At() called before first .Next() or after .Next() returned false.")
|
||||
}
|
||||
|
||||
return c.h[0].At()
|
||||
return c.curr.At()
|
||||
}
|
||||
|
||||
func (c *chainSampleIterator) Next() bool {
|
||||
if c.h == nil {
|
||||
for _, iter := range c.iterators {
|
||||
c.h = samplesIteratorHeap{}
|
||||
// We call c.curr.Next() as the first thing below.
|
||||
// So, we don't call Next() on it here.
|
||||
c.curr = c.iterators[0]
|
||||
for _, iter := range c.iterators[1:] {
|
||||
if iter.Next() {
|
||||
heap.Push(&c.h, iter)
|
||||
}
|
||||
}
|
||||
|
||||
return len(c.h) > 0
|
||||
}
|
||||
|
||||
if len(c.h) == 0 {
|
||||
if c.curr == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
currt, _ := c.At()
|
||||
for len(c.h) > 0 {
|
||||
nextt, _ := c.h[0].At()
|
||||
// All but one of the overlapping samples will be dropped.
|
||||
if nextt != currt {
|
||||
break
|
||||
var currt int64
|
||||
for {
|
||||
if c.curr.Next() {
|
||||
currt, _ = c.curr.At()
|
||||
if currt == c.lastt {
|
||||
// Ignoring sample for the same timestamp.
|
||||
continue
|
||||
}
|
||||
if len(c.h) == 0 {
|
||||
// curr is the only iterator remaining,
|
||||
// no need to check with the heap.
|
||||
break
|
||||
}
|
||||
|
||||
// Check current iterator with the top of the heap.
|
||||
if nextt, _ := c.h[0].At(); currt < nextt {
|
||||
// Current iterator has smaller timestamp than the heap.
|
||||
break
|
||||
}
|
||||
// Current iterator does not hold the smallest timestamp.
|
||||
heap.Push(&c.h, c.curr)
|
||||
} else if len(c.h) == 0 {
|
||||
// No iterator left to iterate.
|
||||
c.curr = nil
|
||||
return false
|
||||
}
|
||||
|
||||
iter := heap.Pop(&c.h).(chunkenc.Iterator)
|
||||
if iter.Next() {
|
||||
heap.Push(&c.h, iter)
|
||||
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
|
||||
currt, _ = c.curr.At()
|
||||
if currt != c.lastt {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return len(c.h) > 0
|
||||
c.lastt = currt
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *chainSampleIterator) Err() error {
|
||||
|
|
|
@ -448,10 +448,10 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
|
|||
name: "three in chained overlap",
|
||||
input: []ChunkSeries{
|
||||
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
|
||||
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 6}}),
|
||||
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}),
|
||||
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}),
|
||||
},
|
||||
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}, sample{6, 6}, sample{10, 10}}),
|
||||
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}, sample{6, 66}, sample{10, 10}}),
|
||||
},
|
||||
{
|
||||
name: "three in chained overlap complex",
|
||||
|
|
Loading…
Reference in New Issue