Make sure deleted intervals are excluded from Seek (#6980)

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
This commit is contained in:
Goutham Veeramachaneni 2020-04-23 11:00:30 +02:00 committed by GitHub
parent f3c6d26781
commit 84b4d079c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 98 deletions

View File

@ -1159,7 +1159,28 @@ func (it *deletedIterator) Seek(t int64) bool {
if it.it.Err() != nil {
return false
}
return it.it.Seek(t)
if ok := it.it.Seek(t); !ok {
return false
}
// Now double check if the entry falls into a deleted interval.
ts, _ := it.At()
for _, itv := range it.intervals {
if ts < itv.Mint {
return true
}
if ts > itv.Maxt {
it.intervals = it.intervals[1:]
continue
}
// We're in the middle of an interval, we can now call Next().
return it.Next()
}
// The timestamp is greater than all the deleted intervals.
return true
}
func (it *deletedIterator) Next() bool {

View File

@ -1261,7 +1261,6 @@ func TestDeletedIterator(t *testing.T) {
}
for _, c := range cases {
t.Run("Simple", func(t *testing.T) {
i := int64(-1)
it := &deletedIterator{it: chk.Iterator(nil), intervals: c.r[:]}
ranges := c.r[:]
@ -1291,42 +1290,6 @@ func TestDeletedIterator(t *testing.T) {
testutil.Assert(t, i >= 1000, "")
testutil.Ok(t, it.Err())
})
t.Run("Seek", func(t *testing.T) {
const seek = 10
i := int64(seek)
it := &deletedIterator{it: chk.Iterator(nil), intervals: c.r[:]}
ranges := c.r[:]
testutil.Assert(t, it.Seek(seek), "")
for it.Next() {
i++
for _, tr := range ranges {
if tr.InBounds(i) {
i = tr.Maxt + 1
ranges = ranges[1:]
}
}
testutil.Assert(t, i < 1000, "")
ts, v := it.At()
testutil.Equals(t, act[i].t, ts)
testutil.Equals(t, act[i].v, v)
}
// There has been an extra call to Next().
i++
for _, tr := range ranges {
if tr.InBounds(i) {
i = tr.Maxt + 1
ranges = ranges[1:]
}
}
testutil.Assert(t, i >= 1000, "")
testutil.Ok(t, it.Err())
})
}
}
@ -1338,55 +1301,36 @@ func TestDeletedIterator_WithSeek(t *testing.T) {
act := make([]sample, 1000)
for i := 0; i < 1000; i++ {
act[i].t = int64(i)
act[i].v = rand.Float64()
act[i].v = float64(i)
app.Append(act[i].t, act[i].v)
}
cases := []struct {
r tombstones.Intervals
seek int64
ok bool
seekedTs int64
}{
{r: tombstones.Intervals{{Mint: 1, Maxt: 20}}},
{r: tombstones.Intervals{{Mint: 1, Maxt: 10}, {Mint: 12, Maxt: 20}, {Mint: 21, Maxt: 23}, {Mint: 25, Maxt: 30}}},
{r: tombstones.Intervals{{Mint: 1, Maxt: 10}, {Mint: 12, Maxt: 20}, {Mint: 20, Maxt: 30}}},
{r: tombstones.Intervals{{Mint: 1, Maxt: 10}, {Mint: 12, Maxt: 23}, {Mint: 25, Maxt: 30}}},
{r: tombstones.Intervals{{Mint: 1, Maxt: 23}, {Mint: 12, Maxt: 20}, {Mint: 25, Maxt: 30}}},
{r: tombstones.Intervals{{Mint: 1, Maxt: 23}, {Mint: 12, Maxt: 20}, {Mint: 25, Maxt: 3000}}},
{r: tombstones.Intervals{{Mint: 0, Maxt: 2000}}},
{r: tombstones.Intervals{{Mint: 500, Maxt: 2000}}},
{r: tombstones.Intervals{{Mint: 0, Maxt: 200}}},
{r: tombstones.Intervals{{Mint: 1000, Maxt: 20000}}},
{r: tombstones.Intervals{{Mint: 1, Maxt: 20}}, seek: 1, ok: true, seekedTs: 21},
{r: tombstones.Intervals{{Mint: 1, Maxt: 20}}, seek: 20, ok: true, seekedTs: 21},
{r: tombstones.Intervals{{Mint: 1, Maxt: 20}}, seek: 10, ok: true, seekedTs: 21},
{r: tombstones.Intervals{{Mint: 1, Maxt: 20}}, seek: 999, ok: true, seekedTs: 999},
{r: tombstones.Intervals{{Mint: 1, Maxt: 20}}, seek: 1000, ok: false},
{r: tombstones.Intervals{{Mint: 1, Maxt: 23}, {Mint: 24, Maxt: 40}, {Mint: 45, Maxt: 3000}}, seek: 1, ok: true, seekedTs: 41},
{r: tombstones.Intervals{{Mint: 5, Maxt: 23}, {Mint: 24, Maxt: 40}, {Mint: 41, Maxt: 3000}}, seek: 5, ok: false},
{r: tombstones.Intervals{{Mint: 0, Maxt: 2000}}, seek: 10, ok: false},
{r: tombstones.Intervals{{Mint: 500, Maxt: 2000}}, seek: 10, ok: true, seekedTs: 10},
{r: tombstones.Intervals{{Mint: 500, Maxt: 2000}}, seek: 501, ok: false},
}
for _, c := range cases {
i := int64(-1)
it := &deletedIterator{it: chk.Iterator(nil), intervals: c.r[:]}
ranges := c.r[:]
for it.Next() {
i++
for _, tr := range ranges {
if tr.InBounds(i) {
i = tr.Maxt + 1
ranges = ranges[1:]
}
}
testutil.Assert(t, i < 1000, "")
ts, v := it.At()
testutil.Equals(t, act[i].t, ts)
testutil.Equals(t, act[i].v, v)
testutil.Equals(t, c.ok, it.Seek(c.seek))
if c.ok {
ts, _ := it.At()
testutil.Equals(t, c.seekedTs, ts)
}
// There has been an extra call to Next().
i++
for _, tr := range ranges {
if tr.InBounds(i) {
i = tr.Maxt + 1
ranges = ranges[1:]
}
}
testutil.Assert(t, i >= 1000, "")
testutil.Ok(t, it.Err())
}
}