Reuse Chunk Iterator (#642)

* Reset method for chunkenc.Iterator

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Reset method only for XORIterator

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Use Reset(...) in querier.go

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Reuse deletedIterator

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Another way of reusing chunk iterators

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Unexport xorIterator

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix memSeries.iterator(...)

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Add some comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2019-07-09 15:19:34 +05:30 committed by GitHub
parent 8dfa537843
commit b1cd829030
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 90 additions and 45 deletions

View File

@ -1,5 +1,7 @@
## Master / unreleased
- [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse.
## 0.9.1
- [CHANGE] LiveReader metrics are now injected rather than global.
@ -19,6 +21,7 @@
- [ENHANCEMENT] Reduced disk usage for WAL for small setups.
- [ENHANCEMENT] Optimize queries using regexp for set lookups.
## 0.8.0
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.

View File

@ -44,7 +44,10 @@ type Chunk interface {
Bytes() []byte
Encoding() Encoding
Appender() (Appender, error)
Iterator() Iterator
// The iterator passed as argument is for re-use.
// Depending on implementation, the iterator can
// be re-used or a new iterator can be allocated.
Iterator(Iterator) Iterator
NumSamples() int
}

View File

@ -77,7 +77,7 @@ func testChunk(c Chunk) error {
// fmt.Println("appended", len(c.Bytes()), c.Bytes())
}
it := c.Iterator()
it := c.Iterator(nil)
var res []pair
for it.Next() {
ts, v := it.At()
@ -133,9 +133,10 @@ func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
res := make([]float64, 0, 1024)
var it Iterator
for i := 0; i < len(chunks); i++ {
c := chunks[i]
it := c.Iterator()
it := c.Iterator(it)
for it.Next() {
_, v := it.At()

View File

@ -77,7 +77,7 @@ func (c *XORChunk) NumSamples() int {
// Appender implements the Chunk interface.
func (c *XORChunk) Appender() (Appender, error) {
it := c.iterator()
it := c.iterator(nil)
// To get an appender we must know the state it would have if we had
// appended all existing data from scratch.
@ -102,19 +102,25 @@ func (c *XORChunk) Appender() (Appender, error) {
return a, nil
}
func (c *XORChunk) iterator() *xorIterator {
func (c *XORChunk) iterator(it Iterator) *xorIterator {
// Should iterators guarantee to act on a copy of the data so it doesn't lock append?
// When using striped locks to guard access to chunks, probably yes.
// Could only copy data if the chunk is not completed yet.
if xorIter, ok := it.(*xorIterator); ok {
xorIter.Reset(c.b.bytes())
return xorIter
}
return &xorIterator{
// The first 2 bytes contain chunk headers.
// We skip that for actual samples.
br: newBReader(c.b.bytes()[2:]),
numTotal: binary.BigEndian.Uint16(c.b.bytes()),
}
}
// Iterator implements the Chunk interface.
func (c *XORChunk) Iterator() Iterator {
return c.iterator()
func (c *XORChunk) Iterator(it Iterator) Iterator {
return c.iterator(it)
}
type xorAppender struct {
@ -243,6 +249,21 @@ func (it *xorIterator) Err() error {
return it.err
}
func (it *xorIterator) Reset(b []byte) {
// The first 2 bytes contain chunk headers.
// We skip that for actual samples.
it.br = newBReader(b[2:])
it.numTotal = binary.BigEndian.Uint16(b)
it.numRead = 0
it.t = 0
it.val = 0
it.leading = 0
it.trailing = 0
it.tDelta = 0
it.err = nil
}
func (it *xorIterator) Next() bool {
if it.err != nil || it.numRead == it.numTotal {
return false

View File

@ -245,8 +245,8 @@ func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
if err != nil {
return nil, err
}
ait := a.Iterator()
bit := b.Iterator()
ait := a.Iterator(nil)
bit := b.Iterator(nil)
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()

View File

@ -736,6 +736,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrap(err, "add symbols")
}
delIter := &deletedIterator{}
for set.Next() {
select {
case <-c.ctx.Done():
@ -788,17 +789,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return err
}
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
delIter.it = chk.Chunk.Iterator(delIter.it)
delIter.intervals = dranges
var (
t int64
v float64
)
for it.Next() {
t, v = it.At()
for delIter.Next() {
t, v = delIter.At()
app.Append(t, v)
}
if err := it.Err(); err != nil {
if err := delIter.Err(); err != nil {
return errors.Wrap(err, "iterate chunk while re-encoding")
}

20
head.go
View File

@ -1185,9 +1185,9 @@ type safeChunk struct {
cid int
}
func (c *safeChunk) Iterator() chunkenc.Iterator {
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid)
it := c.s.iterator(c.cid, reuseIter)
c.s.Unlock()
return it
}
@ -1739,7 +1739,7 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/a
}
func (s *memSeries) iterator(id int) chunkenc.Iterator {
func (s *memSeries) iterator(id int, it chunkenc.Iterator) chunkenc.Iterator {
c := s.chunk(id)
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
// which got then garbage collected before it got accessed.
@ -1749,17 +1749,23 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator {
}
if id-s.firstChunkID < len(s.chunks)-1 {
return c.chunk.Iterator()
return c.chunk.Iterator(it)
}
// Serve the last 4 samples for the last chunk from the sample buffer
// as their compressed bytes may be mutated by added samples.
it := &memSafeIterator{
Iterator: c.chunk.Iterator(),
if msIter, ok := it.(*memSafeIterator); ok {
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
msIter.i = -1
msIter.total = c.chunk.NumSamples()
msIter.buf = s.sampleBuf
return msIter
}
return &memSafeIterator{
Iterator: c.chunk.Iterator(it),
i: -1,
total: c.chunk.NumSamples(),
buf: s.sampleBuf,
}
return it
}
func (s *memSeries) head() *memChunk {

View File

@ -159,9 +159,9 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Ok(t, c.Err())
return x
}
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0)))
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil)))
})
}
}
@ -313,11 +313,11 @@ func TestMemSeries_truncateChunks(t *testing.T) {
// Validate that the series' sample buffer is applied correctly to the last chunk
// after truncation.
it1 := s.iterator(s.chunkID(len(s.chunks) - 1))
it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil)
_, ok := it1.(*memSafeIterator)
testutil.Assert(t, ok == true, "")
it2 := s.iterator(s.chunkID(len(s.chunks) - 2))
it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil)
_, ok = it2.(*memSafeIterator)
testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer")
}
@ -451,10 +451,11 @@ func TestHeadDeleteSimple(t *testing.T) {
chunkr, err := h.Chunks()
testutil.Ok(t, err)
var ii chunkenc.Iterator
for _, meta := range chkMetas {
chk, err := chunkr.Chunk(meta.Ref)
testutil.Ok(t, err)
ii := chk.Iterator()
ii = chk.Iterator(ii)
for ii.Next() {
t, v := ii.At()
actSamples = append(actSamples, sample{t: t, v: v})

View File

@ -14,6 +14,7 @@
package tsdb
import (
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
@ -40,10 +41,11 @@ func (m *mockIndexWriter) AddSeries(ref uint64, l labels.Labels, chunks ...chunk
i = len(m.series) - 1
}
var iter chunkenc.Iterator
for _, chk := range chunks {
samples := make([]sample, 0, chk.Chunk.NumSamples())
iter := chk.Chunk.Iterator()
iter = chk.Chunk.Iterator(iter)
for iter.Next() {
s := sample{}
s.t, s.v = iter.At()

View File

@ -1060,8 +1060,9 @@ func (it *verticalMergeSeriesIterator) Err() error {
type chunkSeriesIterator struct {
chunks []chunks.Meta
i int
cur chunkenc.Iterator
i int
cur chunkenc.Iterator
bufDelIter *deletedIterator
maxt, mint int64
@ -1069,21 +1070,32 @@ type chunkSeriesIterator struct {
}
func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator {
it := cs[0].Chunk.Iterator()
if len(dranges) > 0 {
it = &deletedIterator{it: it, intervals: dranges}
}
return &chunkSeriesIterator{
csi := &chunkSeriesIterator{
chunks: cs,
i: 0,
cur: it,
mint: mint,
maxt: maxt,
intervals: dranges,
}
csi.resetCurIterator()
return csi
}
func (it *chunkSeriesIterator) resetCurIterator() {
if len(it.intervals) == 0 {
it.cur = it.chunks[it.i].Chunk.Iterator(it.cur)
return
}
if it.bufDelIter == nil {
it.bufDelIter = &deletedIterator{
intervals: it.intervals,
}
}
it.bufDelIter.it = it.chunks[it.i].Chunk.Iterator(it.bufDelIter.it)
it.cur = it.bufDelIter
}
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
@ -1102,10 +1114,7 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
}
}
it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.intervals) > 0 {
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
}
it.resetCurIterator()
for it.cur.Next() {
t0, _ := it.cur.At()
@ -1145,10 +1154,7 @@ func (it *chunkSeriesIterator) Next() bool {
}
it.i++
it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.intervals) > 0 {
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
}
it.resetCurIterator()
return it.Next()
}

View File

@ -1270,7 +1270,7 @@ func TestDeletedIterator(t *testing.T) {
for _, c := range cases {
i := int64(-1)
it := &deletedIterator{it: chk.Iterator(), intervals: c.r[:]}
it := &deletedIterator{it: chk.Iterator(nil), intervals: c.r[:]}
ranges := c.r[:]
for it.Next() {
i++