From ba418780be3ac1e7611c03d4e8db39cabb505bd6 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 23 May 2018 12:15:47 +0100 Subject: [PATCH] Dedupe samples in the mergeIterator. Signed-off-by: Tom Wilkie --- storage/fanout.go | 16 ++++++++++++---- storage/fanout_test.go | 8 ++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/storage/fanout.go b/storage/fanout.go index 7835d1ef7..328287159 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -432,7 +432,6 @@ func (c *mergeIterator) At() (t int64, v float64) { panic("mergeIterator.At() called after .Next() returned false.") } - // TODO do I need to dedupe or just merge? return c.h[0].At() } @@ -443,6 +442,7 @@ func (c *mergeIterator) Next() bool { heap.Push(&c.h, iter) } } + return len(c.h) > 0 } @@ -450,9 +450,17 @@ func (c *mergeIterator) Next() bool { return false } - iter := heap.Pop(&c.h).(SeriesIterator) - if iter.Next() { - heap.Push(&c.h, iter) + currt, currv := c.At() + for len(c.h) > 0 { + nextt, nextv := c.h[0].At() + if nextt != currt || nextv != currv { + break + } + + iter := heap.Pop(&c.h).(SeriesIterator) + if iter.Next() { + heap.Push(&c.h, iter) + } } return len(c.h) > 0 diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 66d86fc93..f6298c859 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -136,6 +136,14 @@ func TestMergeIterator(t *testing.T) { }, expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}}, }, + { + input: []SeriesIterator{ + newListSeriesIterator([]sample{{0, 0}, {1, 1}}), + newListSeriesIterator([]sample{{0, 0}, {2, 2}}), + newListSeriesIterator([]sample{{2, 2}, {3, 3}}), + }, + expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}}, + }, } { merged := newMergeIterator(tc.input) actual := drainSamples(merged)