Merge pull request #4185 from prometheus/4184-dedup-read-recent
Dedupe samples in the mergeIterator.
This commit is contained in:
commit
18e6fa7c8a
|
@ -432,7 +432,6 @@ func (c *mergeIterator) At() (t int64, v float64) {
|
||||||
panic("mergeIterator.At() called after .Next() returned false.")
|
panic("mergeIterator.At() called after .Next() returned false.")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO do I need to dedupe or just merge?
|
|
||||||
return c.h[0].At()
|
return c.h[0].At()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -443,6 +442,7 @@ func (c *mergeIterator) Next() bool {
|
||||||
heap.Push(&c.h, iter)
|
heap.Push(&c.h, iter)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(c.h) > 0
|
return len(c.h) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -450,9 +450,17 @@ func (c *mergeIterator) Next() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
iter := heap.Pop(&c.h).(SeriesIterator)
|
currt, currv := c.At()
|
||||||
if iter.Next() {
|
for len(c.h) > 0 {
|
||||||
heap.Push(&c.h, iter)
|
nextt, nextv := c.h[0].At()
|
||||||
|
if nextt != currt || nextv != currv {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
iter := heap.Pop(&c.h).(SeriesIterator)
|
||||||
|
if iter.Next() {
|
||||||
|
heap.Push(&c.h, iter)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return len(c.h) > 0
|
return len(c.h) > 0
|
||||||
|
|
|
@ -136,6 +136,14 @@ func TestMergeIterator(t *testing.T) {
|
||||||
},
|
},
|
||||||
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}},
|
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
input: []SeriesIterator{
|
||||||
|
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
|
||||||
|
newListSeriesIterator([]sample{{0, 0}, {2, 2}}),
|
||||||
|
newListSeriesIterator([]sample{{2, 2}, {3, 3}}),
|
||||||
|
},
|
||||||
|
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}},
|
||||||
|
},
|
||||||
} {
|
} {
|
||||||
merged := newMergeIterator(tc.input)
|
merged := newMergeIterator(tc.input)
|
||||||
actual := drainSamples(merged)
|
actual := drainSamples(merged)
|
||||||
|
|
Loading…
Reference in New Issue