diff --git a/tsdb/chunkenc/bstream.go b/tsdb/chunkenc/bstream.go index 7b17f4686..8cc59f3ea 100644 --- a/tsdb/chunkenc/bstream.go +++ b/tsdb/chunkenc/bstream.go @@ -52,6 +52,12 @@ type bstream struct { count uint8 // How many right-most bits are available for writing in the current byte (the last byte of the stream). } +// Reset resets b around stream. +func (b *bstream) Reset(stream []byte) { + b.stream = stream + b.count = 0 +} + func (b *bstream) bytes() []byte { return b.stream } diff --git a/tsdb/chunkenc/bstream_test.go b/tsdb/chunkenc/bstream_test.go index 66a54bc8e..8ac45ef0b 100644 --- a/tsdb/chunkenc/bstream_test.go +++ b/tsdb/chunkenc/bstream_test.go @@ -19,6 +19,19 @@ import ( "github.com/stretchr/testify/require" ) +func TestBstream_Reset(t *testing.T) { + bs := bstream{ + stream: []byte("test"), + count: 10, + } + bs.Reset([]byte("was reset")) + + require.Equal(t, bstream{ + stream: []byte("was reset"), + count: 0, + }, bs) +} + func TestBstreamReader(t *testing.T) { // Write to the bit stream. w := bstream{} diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 21c41257b..1421f3b39 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -87,6 +87,9 @@ type Chunk interface { // There's no strong guarantee that no samples will be appended once // Compact() is called. Implementing this function is optional. Compact() + + // Reset resets the chunk given stream. + Reset(stream []byte) } type Iterable interface { @@ -303,64 +306,47 @@ func NewPool() Pool { } func (p *pool) Get(e Encoding, b []byte) (Chunk, error) { + var c Chunk switch e { case EncXOR: - c := p.xor.Get().(*XORChunk) - c.b.stream = b - c.b.count = 0 - return c, nil + c = p.xor.Get().(*XORChunk) case EncHistogram: - c := p.histogram.Get().(*HistogramChunk) - c.b.stream = b - c.b.count = 0 - return c, nil + c = p.histogram.Get().(*HistogramChunk) case EncFloatHistogram: - c := p.floatHistogram.Get().(*FloatHistogramChunk) - c.b.stream = b - c.b.count = 0 - return c, nil + c = p.floatHistogram.Get().(*FloatHistogramChunk) + default: + return nil, fmt.Errorf("invalid chunk encoding %q", e) } - return nil, fmt.Errorf("invalid chunk encoding %q", e) + + c.Reset(b) + return c, nil } func (p *pool) Put(c Chunk) error { + var sp *sync.Pool + var ok bool switch c.Encoding() { case EncXOR: - xc, ok := c.(*XORChunk) - // This may happen often with wrapped chunks. Nothing we can really do about - // it but returning an error would cause a lot of allocations again. Thus, - // we just skip it. - if !ok { - return nil - } - xc.b.stream = nil - xc.b.count = 0 - p.xor.Put(c) + _, ok = c.(*XORChunk) + sp = &p.xor case EncHistogram: - sh, ok := c.(*HistogramChunk) - // This may happen often with wrapped chunks. Nothing we can really do about - // it but returning an error would cause a lot of allocations again. Thus, - // we just skip it. - if !ok { - return nil - } - sh.b.stream = nil - sh.b.count = 0 - p.histogram.Put(c) + _, ok = c.(*HistogramChunk) + sp = &p.histogram case EncFloatHistogram: - sh, ok := c.(*FloatHistogramChunk) - // This may happen often with wrapped chunks. Nothing we can really do about - // it but returning an error would cause a lot of allocations again. Thus, - // we just skip it. - if !ok { - return nil - } - sh.b.stream = nil - sh.b.count = 0 - p.floatHistogram.Put(c) + _, ok = c.(*FloatHistogramChunk) + sp = &p.floatHistogram default: return fmt.Errorf("invalid chunk encoding %q", c.Encoding()) } + if !ok { + // This may happen often with wrapped chunks. Nothing we can really do about + // it but returning an error would cause a lot of allocations again. Thus, + // we just skip it. + return nil + } + + c.Reset(nil) + sp.Put(c) return nil } diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index 9db1bf364..b72492a08 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -110,6 +110,96 @@ func testChunk(t *testing.T, c Chunk) { require.Equal(t, ValNone, it3.Seek(exp[len(exp)-1].t+1)) } +func TestPool(t *testing.T) { + p := NewPool() + for _, tc := range []struct { + name string + encoding Encoding + expErr error + }{ + { + name: "xor", + encoding: EncXOR, + }, + { + name: "histogram", + encoding: EncHistogram, + }, + { + name: "float histogram", + encoding: EncFloatHistogram, + }, + { + name: "invalid encoding", + encoding: EncNone, + expErr: fmt.Errorf(`invalid chunk encoding "none"`), + }, + } { + t.Run(tc.name, func(t *testing.T) { + c, err := p.Get(tc.encoding, []byte("test")) + if tc.expErr != nil { + require.EqualError(t, err, tc.expErr.Error()) + return + } + + require.NoError(t, err) + + var b *bstream + switch tc.encoding { + case EncHistogram: + b = &c.(*HistogramChunk).b + case EncFloatHistogram: + b = &c.(*FloatHistogramChunk).b + default: + b = &c.(*XORChunk).b + } + + require.Equal(t, &bstream{ + stream: []byte("test"), + count: 0, + }, b) + + b.count = 1 + require.NoError(t, p.Put(c)) + require.Equal(t, &bstream{ + stream: nil, + count: 0, + }, b) + }) + } + + t.Run("put bad chunk wrapper", func(t *testing.T) { + // When a wrapping chunk poses as an encoding it can't be converted to, Put should skip it. + c := fakeChunk{ + encoding: EncXOR, + t: t, + } + require.NoError(t, p.Put(c)) + }) + t.Run("put invalid encoding", func(t *testing.T) { + c := fakeChunk{ + encoding: EncNone, + t: t, + } + require.EqualError(t, p.Put(c), `invalid chunk encoding "none"`) + }) +} + +type fakeChunk struct { + Chunk + + encoding Encoding + t *testing.T +} + +func (c fakeChunk) Encoding() Encoding { + return c.encoding +} + +func (c fakeChunk) Reset([]byte) { + c.t.Fatal("Reset should not be called") +} + func benchmarkIterator(b *testing.B, newChunk func() Chunk) { const samplesPerChunk = 250 var ( diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 88d189254..1eed46ca8 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -44,6 +44,10 @@ func NewFloatHistogramChunk() *FloatHistogramChunk { return &FloatHistogramChunk{b: bstream{stream: b, count: 0}} } +func (c *FloatHistogramChunk) Reset(stream []byte) { + c.b.Reset(stream) +} + // xorValue holds all the necessary information to encode // and decode XOR encoded float64 values. type xorValue struct { diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index cb09eda26..e12aec4dc 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -45,6 +45,10 @@ func NewHistogramChunk() *HistogramChunk { return &HistogramChunk{b: bstream{stream: b, count: 0}} } +func (c *HistogramChunk) Reset(stream []byte) { + c.b.Reset(stream) +} + // Encoding returns the encoding type. func (c *HistogramChunk) Encoding() Encoding { return EncHistogram diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 07b923831..9430de396 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -66,6 +66,10 @@ func NewXORChunk() *XORChunk { return &XORChunk{b: bstream{stream: b, count: 0}} } +func (c *XORChunk) Reset(stream []byte) { + c.b.Reset(stream) +} + // Encoding returns the encoding type. func (c *XORChunk) Encoding() Encoding { return EncXOR @@ -171,7 +175,6 @@ func (a *xorAppender) Append(t int64, v float64) { } a.writeVDelta(v) - default: tDelta = uint64(t - a.t) dod := int64(tDelta - a.tDelta)