mirror of
https://github.com/prometheus/prometheus
synced 2024-12-29 02:02:17 +00:00
Move BufferedSeriesIterator in own package
This functionality is useful for a lot of clients but not relevant to the TSDB's core features.
This commit is contained in:
parent
e478d0e3bc
commit
3be4ef94ce
5
head.go
5
head.go
@ -587,6 +587,11 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries {
|
||||
return s
|
||||
}
|
||||
|
||||
type sample struct {
|
||||
t int64
|
||||
v float64
|
||||
}
|
||||
|
||||
type memSeries struct {
|
||||
mtx sync.RWMutex
|
||||
|
||||
|
197
querier.go
197
querier.go
@ -2,7 +2,6 @@ package tsdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
@ -605,202 +604,6 @@ func (it *chunkSeriesIterator) Err() error {
|
||||
return it.cur.Err()
|
||||
}
|
||||
|
||||
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
|
||||
type BufferedSeriesIterator struct {
|
||||
it SeriesIterator
|
||||
buf *sampleRing
|
||||
|
||||
lastTime int64
|
||||
}
|
||||
|
||||
// NewBuffer returns a new iterator that buffers the values within the time range
|
||||
// of the current element and the duration of delta before.
|
||||
func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator {
|
||||
return &BufferedSeriesIterator{
|
||||
it: it,
|
||||
buf: newSampleRing(delta, 16),
|
||||
lastTime: math.MinInt64,
|
||||
}
|
||||
}
|
||||
|
||||
// PeekBack returns the previous element of the iterator. If there is none buffered,
|
||||
// ok is false.
|
||||
func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) {
|
||||
return b.buf.last()
|
||||
}
|
||||
|
||||
// Buffer returns an iterator over the buffered data.
|
||||
func (b *BufferedSeriesIterator) Buffer() SeriesIterator {
|
||||
return b.buf.iterator()
|
||||
}
|
||||
|
||||
// Seek advances the iterator to the element at time t or greater.
|
||||
func (b *BufferedSeriesIterator) Seek(t int64) bool {
|
||||
t0 := t - b.buf.delta
|
||||
|
||||
// If the delta would cause us to seek backwards, preserve the buffer
|
||||
// and just continue regular advancment while filling the buffer on the way.
|
||||
if t0 > b.lastTime {
|
||||
b.buf.reset()
|
||||
|
||||
ok := b.it.Seek(t0)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
b.lastTime, _ = b.At()
|
||||
}
|
||||
|
||||
if b.lastTime >= t {
|
||||
return true
|
||||
}
|
||||
for b.Next() {
|
||||
if b.lastTime >= t {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Next advances the iterator to the next element.
|
||||
func (b *BufferedSeriesIterator) Next() bool {
|
||||
// Add current element to buffer before advancing.
|
||||
b.buf.add(b.it.At())
|
||||
|
||||
ok := b.it.Next()
|
||||
if ok {
|
||||
b.lastTime, _ = b.At()
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
// At returns the current element of the iterator.
|
||||
func (b *BufferedSeriesIterator) At() (int64, float64) {
|
||||
return b.it.At()
|
||||
}
|
||||
|
||||
// Err returns the last encountered error.
|
||||
func (b *BufferedSeriesIterator) Err() error {
|
||||
return b.it.Err()
|
||||
}
|
||||
|
||||
type sample struct {
|
||||
t int64
|
||||
v float64
|
||||
}
|
||||
|
||||
type sampleRing struct {
|
||||
delta int64
|
||||
|
||||
buf []sample // lookback buffer
|
||||
i int // position of most recent element in ring buffer
|
||||
f int // position of first element in ring buffer
|
||||
l int // number of elements in buffer
|
||||
}
|
||||
|
||||
func newSampleRing(delta int64, sz int) *sampleRing {
|
||||
r := &sampleRing{delta: delta, buf: make([]sample, sz)}
|
||||
r.reset()
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *sampleRing) reset() {
|
||||
r.l = 0
|
||||
r.i = -1
|
||||
r.f = 0
|
||||
}
|
||||
|
||||
func (r *sampleRing) iterator() SeriesIterator {
|
||||
return &sampleRingIterator{r: r, i: -1}
|
||||
}
|
||||
|
||||
type sampleRingIterator struct {
|
||||
r *sampleRing
|
||||
i int
|
||||
}
|
||||
|
||||
func (it *sampleRingIterator) Next() bool {
|
||||
it.i++
|
||||
return it.i < it.r.l
|
||||
}
|
||||
|
||||
func (it *sampleRingIterator) Seek(int64) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (it *sampleRingIterator) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (it *sampleRingIterator) At() (int64, float64) {
|
||||
return it.r.at(it.i)
|
||||
}
|
||||
|
||||
func (r *sampleRing) at(i int) (int64, float64) {
|
||||
j := (r.f + i) % len(r.buf)
|
||||
s := r.buf[j]
|
||||
return s.t, s.v
|
||||
}
|
||||
|
||||
// add adds a sample to the ring buffer and frees all samples that fall
|
||||
// out of the delta range.
|
||||
func (r *sampleRing) add(t int64, v float64) {
|
||||
l := len(r.buf)
|
||||
// Grow the ring buffer if it fits no more elements.
|
||||
if l == r.l {
|
||||
buf := make([]sample, 2*l)
|
||||
copy(buf[l+r.f:], r.buf[r.f:])
|
||||
copy(buf, r.buf[:r.f])
|
||||
|
||||
r.buf = buf
|
||||
r.i = r.f
|
||||
r.f += l
|
||||
} else {
|
||||
r.i++
|
||||
if r.i >= l {
|
||||
r.i -= l
|
||||
}
|
||||
}
|
||||
|
||||
r.buf[r.i] = sample{t: t, v: v}
|
||||
r.l++
|
||||
|
||||
// Free head of the buffer of samples that just fell out of the range.
|
||||
for r.buf[r.f].t < t-r.delta {
|
||||
r.f++
|
||||
if r.f >= l {
|
||||
r.f -= l
|
||||
}
|
||||
r.l--
|
||||
}
|
||||
}
|
||||
|
||||
// last returns the most recent element added to the ring.
|
||||
func (r *sampleRing) last() (int64, float64, bool) {
|
||||
if r.l == 0 {
|
||||
return 0, 0, false
|
||||
}
|
||||
s := r.buf[r.i]
|
||||
return s.t, s.v, true
|
||||
}
|
||||
|
||||
func (r *sampleRing) samples() []sample {
|
||||
res := make([]sample, r.l)
|
||||
|
||||
var k = r.f + r.l
|
||||
var j int
|
||||
if k > len(r.buf) {
|
||||
k = len(r.buf)
|
||||
j = r.l - k + r.f
|
||||
}
|
||||
|
||||
n := copy(res, r.buf[r.f:k])
|
||||
copy(res[n:], r.buf[:j])
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
type mockSeriesSet struct {
|
||||
next func() bool
|
||||
series func() Series
|
||||
|
116
querier_test.go
116
querier_test.go
@ -1,7 +1,6 @@
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
@ -201,118 +200,3 @@ func expandSeriesIterator(it SeriesIterator) (r []sample, err error) {
|
||||
|
||||
return r, it.Err()
|
||||
}
|
||||
|
||||
func TestSampleRing(t *testing.T) {
|
||||
cases := []struct {
|
||||
input []int64
|
||||
delta int64
|
||||
size int
|
||||
}{
|
||||
{
|
||||
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
|
||||
delta: 2,
|
||||
size: 1,
|
||||
},
|
||||
{
|
||||
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
|
||||
delta: 2,
|
||||
size: 2,
|
||||
},
|
||||
{
|
||||
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
|
||||
delta: 7,
|
||||
size: 3,
|
||||
},
|
||||
{
|
||||
input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20},
|
||||
delta: 7,
|
||||
size: 1,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
r := newSampleRing(c.delta, c.size)
|
||||
|
||||
input := []sample{}
|
||||
for _, t := range c.input {
|
||||
input = append(input, sample{
|
||||
t: t,
|
||||
v: float64(rand.Intn(100)),
|
||||
})
|
||||
}
|
||||
|
||||
for i, s := range input {
|
||||
r.add(s.t, s.v)
|
||||
buffered := r.samples()
|
||||
|
||||
for _, sold := range input[:i] {
|
||||
found := false
|
||||
for _, bs := range buffered {
|
||||
if bs.t == sold.t && bs.v == sold.v {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if sold.t >= s.t-c.delta && !found {
|
||||
t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered)
|
||||
}
|
||||
if sold.t < s.t-c.delta && found {
|
||||
t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBufferedSeriesIterator(t *testing.T) {
|
||||
var it *BufferedSeriesIterator
|
||||
|
||||
bufferEq := func(exp []sample) {
|
||||
var b []sample
|
||||
bit := it.Buffer()
|
||||
for bit.Next() {
|
||||
t, v := bit.At()
|
||||
b = append(b, sample{t: t, v: v})
|
||||
}
|
||||
require.Equal(t, exp, b, "buffer mismatch")
|
||||
}
|
||||
sampleEq := func(ets int64, ev float64) {
|
||||
ts, v := it.At()
|
||||
require.Equal(t, ets, ts, "timestamp mismatch")
|
||||
require.Equal(t, ev, v, "value mismatch")
|
||||
}
|
||||
|
||||
it = NewBuffer(newListSeriesIterator([]sample{
|
||||
{t: 1, v: 2},
|
||||
{t: 2, v: 3},
|
||||
{t: 3, v: 4},
|
||||
{t: 4, v: 5},
|
||||
{t: 5, v: 6},
|
||||
{t: 99, v: 8},
|
||||
{t: 100, v: 9},
|
||||
{t: 101, v: 10},
|
||||
}), 2)
|
||||
|
||||
require.True(t, it.Seek(-123), "seek failed")
|
||||
sampleEq(1, 2)
|
||||
bufferEq(nil)
|
||||
|
||||
require.True(t, it.Next(), "next failed")
|
||||
sampleEq(2, 3)
|
||||
bufferEq([]sample{{t: 1, v: 2}})
|
||||
|
||||
require.True(t, it.Next(), "next failed")
|
||||
require.True(t, it.Next(), "next failed")
|
||||
require.True(t, it.Next(), "next failed")
|
||||
sampleEq(5, 6)
|
||||
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}})
|
||||
|
||||
require.True(t, it.Seek(5), "seek failed")
|
||||
sampleEq(5, 6)
|
||||
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}})
|
||||
|
||||
require.True(t, it.Seek(101), "seek failed")
|
||||
sampleEq(101, 10)
|
||||
bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}})
|
||||
|
||||
require.False(t, it.Next(), "next succeeded unexpectedly")
|
||||
}
|
||||
|
203
tsdbutil/buffer.go
Normal file
203
tsdbutil/buffer.go
Normal file
@ -0,0 +1,203 @@
|
||||
package tsdbutil
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
||||
"github.com/fabxc/tsdb"
|
||||
)
|
||||
|
||||
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
|
||||
type BufferedSeriesIterator struct {
|
||||
it tsdb.SeriesIterator
|
||||
buf *sampleRing
|
||||
|
||||
lastTime int64
|
||||
}
|
||||
|
||||
// NewBuffer returns a new iterator that buffers the values within the time range
|
||||
// of the current element and the duration of delta before.
|
||||
func NewBuffer(it tsdb.SeriesIterator, delta int64) *BufferedSeriesIterator {
|
||||
return &BufferedSeriesIterator{
|
||||
it: it,
|
||||
buf: newSampleRing(delta, 16),
|
||||
lastTime: math.MinInt64,
|
||||
}
|
||||
}
|
||||
|
||||
// PeekBack returns the previous element of the iterator. If there is none buffered,
|
||||
// ok is false.
|
||||
func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) {
|
||||
return b.buf.last()
|
||||
}
|
||||
|
||||
// Buffer returns an iterator over the buffered data.
|
||||
func (b *BufferedSeriesIterator) Buffer() tsdb.SeriesIterator {
|
||||
return b.buf.iterator()
|
||||
}
|
||||
|
||||
// Seek advances the iterator to the element at time t or greater.
|
||||
func (b *BufferedSeriesIterator) Seek(t int64) bool {
|
||||
t0 := t - b.buf.delta
|
||||
|
||||
// If the delta would cause us to seek backwards, preserve the buffer
|
||||
// and just continue regular advancment while filling the buffer on the way.
|
||||
if t0 > b.lastTime {
|
||||
b.buf.reset()
|
||||
|
||||
ok := b.it.Seek(t0)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
b.lastTime, _ = b.At()
|
||||
}
|
||||
|
||||
if b.lastTime >= t {
|
||||
return true
|
||||
}
|
||||
for b.Next() {
|
||||
if b.lastTime >= t {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Next advances the iterator to the next element.
|
||||
func (b *BufferedSeriesIterator) Next() bool {
|
||||
// Add current element to buffer before advancing.
|
||||
b.buf.add(b.it.At())
|
||||
|
||||
ok := b.it.Next()
|
||||
if ok {
|
||||
b.lastTime, _ = b.At()
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
// At returns the current element of the iterator.
|
||||
func (b *BufferedSeriesIterator) At() (int64, float64) {
|
||||
return b.it.At()
|
||||
}
|
||||
|
||||
// Err returns the last encountered error.
|
||||
func (b *BufferedSeriesIterator) Err() error {
|
||||
return b.it.Err()
|
||||
}
|
||||
|
||||
type sample struct {
|
||||
t int64
|
||||
v float64
|
||||
}
|
||||
|
||||
type sampleRing struct {
|
||||
delta int64
|
||||
|
||||
buf []sample // lookback buffer
|
||||
i int // position of most recent element in ring buffer
|
||||
f int // position of first element in ring buffer
|
||||
l int // number of elements in buffer
|
||||
}
|
||||
|
||||
func newSampleRing(delta int64, sz int) *sampleRing {
|
||||
r := &sampleRing{delta: delta, buf: make([]sample, sz)}
|
||||
r.reset()
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *sampleRing) reset() {
|
||||
r.l = 0
|
||||
r.i = -1
|
||||
r.f = 0
|
||||
}
|
||||
|
||||
func (r *sampleRing) iterator() tsdb.SeriesIterator {
|
||||
return &sampleRingIterator{r: r, i: -1}
|
||||
}
|
||||
|
||||
type sampleRingIterator struct {
|
||||
r *sampleRing
|
||||
i int
|
||||
}
|
||||
|
||||
func (it *sampleRingIterator) Next() bool {
|
||||
it.i++
|
||||
return it.i < it.r.l
|
||||
}
|
||||
|
||||
func (it *sampleRingIterator) Seek(int64) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (it *sampleRingIterator) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (it *sampleRingIterator) At() (int64, float64) {
|
||||
return it.r.at(it.i)
|
||||
}
|
||||
|
||||
func (r *sampleRing) at(i int) (int64, float64) {
|
||||
j := (r.f + i) % len(r.buf)
|
||||
s := r.buf[j]
|
||||
return s.t, s.v
|
||||
}
|
||||
|
||||
// add adds a sample to the ring buffer and frees all samples that fall
|
||||
// out of the delta range.
|
||||
func (r *sampleRing) add(t int64, v float64) {
|
||||
l := len(r.buf)
|
||||
// Grow the ring buffer if it fits no more elements.
|
||||
if l == r.l {
|
||||
buf := make([]sample, 2*l)
|
||||
copy(buf[l+r.f:], r.buf[r.f:])
|
||||
copy(buf, r.buf[:r.f])
|
||||
|
||||
r.buf = buf
|
||||
r.i = r.f
|
||||
r.f += l
|
||||
} else {
|
||||
r.i++
|
||||
if r.i >= l {
|
||||
r.i -= l
|
||||
}
|
||||
}
|
||||
|
||||
r.buf[r.i] = sample{t: t, v: v}
|
||||
r.l++
|
||||
|
||||
// Free head of the buffer of samples that just fell out of the range.
|
||||
for r.buf[r.f].t < t-r.delta {
|
||||
r.f++
|
||||
if r.f >= l {
|
||||
r.f -= l
|
||||
}
|
||||
r.l--
|
||||
}
|
||||
}
|
||||
|
||||
// last returns the most recent element added to the ring.
|
||||
func (r *sampleRing) last() (int64, float64, bool) {
|
||||
if r.l == 0 {
|
||||
return 0, 0, false
|
||||
}
|
||||
s := r.buf[r.i]
|
||||
return s.t, s.v, true
|
||||
}
|
||||
|
||||
func (r *sampleRing) samples() []sample {
|
||||
res := make([]sample, r.l)
|
||||
|
||||
var k = r.f + r.l
|
||||
var j int
|
||||
if k > len(r.buf) {
|
||||
k = len(r.buf)
|
||||
j = r.l - k + r.f
|
||||
}
|
||||
|
||||
n := copy(res, r.buf[r.f:k])
|
||||
copy(res[n:], r.buf[:j])
|
||||
|
||||
return res
|
||||
}
|
160
tsdbutil/buffer_test.go
Normal file
160
tsdbutil/buffer_test.go
Normal file
@ -0,0 +1,160 @@
|
||||
package tsdbutil
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSampleRing(t *testing.T) {
|
||||
cases := []struct {
|
||||
input []int64
|
||||
delta int64
|
||||
size int
|
||||
}{
|
||||
{
|
||||
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
|
||||
delta: 2,
|
||||
size: 1,
|
||||
},
|
||||
{
|
||||
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
|
||||
delta: 2,
|
||||
size: 2,
|
||||
},
|
||||
{
|
||||
input: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
|
||||
delta: 7,
|
||||
size: 3,
|
||||
},
|
||||
{
|
||||
input: []int64{1, 2, 3, 4, 5, 16, 17, 18, 19, 20},
|
||||
delta: 7,
|
||||
size: 1,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
r := newSampleRing(c.delta, c.size)
|
||||
|
||||
input := []sample{}
|
||||
for _, t := range c.input {
|
||||
input = append(input, sample{
|
||||
t: t,
|
||||
v: float64(rand.Intn(100)),
|
||||
})
|
||||
}
|
||||
|
||||
for i, s := range input {
|
||||
r.add(s.t, s.v)
|
||||
buffered := r.samples()
|
||||
|
||||
for _, sold := range input[:i] {
|
||||
found := false
|
||||
for _, bs := range buffered {
|
||||
if bs.t == sold.t && bs.v == sold.v {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if sold.t >= s.t-c.delta && !found {
|
||||
t.Fatalf("%d: expected sample %d to be in buffer but was not; buffer %v", i, sold.t, buffered)
|
||||
}
|
||||
if sold.t < s.t-c.delta && found {
|
||||
t.Fatalf("%d: unexpected sample %d in buffer; buffer %v", i, sold.t, buffered)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBufferedSeriesIterator(t *testing.T) {
|
||||
var it *BufferedSeriesIterator
|
||||
|
||||
bufferEq := func(exp []sample) {
|
||||
var b []sample
|
||||
bit := it.Buffer()
|
||||
for bit.Next() {
|
||||
t, v := bit.At()
|
||||
b = append(b, sample{t: t, v: v})
|
||||
}
|
||||
require.Equal(t, exp, b, "buffer mismatch")
|
||||
}
|
||||
sampleEq := func(ets int64, ev float64) {
|
||||
ts, v := it.At()
|
||||
require.Equal(t, ets, ts, "timestamp mismatch")
|
||||
require.Equal(t, ev, v, "value mismatch")
|
||||
}
|
||||
|
||||
it = NewBuffer(newListSeriesIterator([]sample{
|
||||
{t: 1, v: 2},
|
||||
{t: 2, v: 3},
|
||||
{t: 3, v: 4},
|
||||
{t: 4, v: 5},
|
||||
{t: 5, v: 6},
|
||||
{t: 99, v: 8},
|
||||
{t: 100, v: 9},
|
||||
{t: 101, v: 10},
|
||||
}), 2)
|
||||
|
||||
require.True(t, it.Seek(-123), "seek failed")
|
||||
sampleEq(1, 2)
|
||||
bufferEq(nil)
|
||||
|
||||
require.True(t, it.Next(), "next failed")
|
||||
sampleEq(2, 3)
|
||||
bufferEq([]sample{{t: 1, v: 2}})
|
||||
|
||||
require.True(t, it.Next(), "next failed")
|
||||
require.True(t, it.Next(), "next failed")
|
||||
require.True(t, it.Next(), "next failed")
|
||||
sampleEq(5, 6)
|
||||
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}})
|
||||
|
||||
require.True(t, it.Seek(5), "seek failed")
|
||||
sampleEq(5, 6)
|
||||
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}})
|
||||
|
||||
require.True(t, it.Seek(101), "seek failed")
|
||||
sampleEq(101, 10)
|
||||
bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}})
|
||||
|
||||
require.False(t, it.Next(), "next succeeded unexpectedly")
|
||||
}
|
||||
|
||||
type listSeriesIterator struct {
|
||||
list []sample
|
||||
idx int
|
||||
}
|
||||
|
||||
func newListSeriesIterator(list []sample) *listSeriesIterator {
|
||||
return &listSeriesIterator{list: list, idx: -1}
|
||||
}
|
||||
|
||||
func (it *listSeriesIterator) At() (int64, float64) {
|
||||
s := it.list[it.idx]
|
||||
return s.t, s.v
|
||||
}
|
||||
|
||||
func (it *listSeriesIterator) Next() bool {
|
||||
it.idx++
|
||||
return it.idx < len(it.list)
|
||||
}
|
||||
|
||||
func (it *listSeriesIterator) Seek(t int64) bool {
|
||||
if it.idx == -1 {
|
||||
it.idx = 0
|
||||
}
|
||||
// Do binary search between current position and end.
|
||||
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool {
|
||||
s := it.list[i+it.idx]
|
||||
return s.t >= t
|
||||
})
|
||||
|
||||
return it.idx < len(it.list)
|
||||
}
|
||||
|
||||
func (it *listSeriesIterator) Err() error {
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user