Add chunk package

This commit is contained in:
Fabian Reinartz 2016-11-15 10:33:34 +01:00
parent 7771cdb519
commit ae7b18ae34
5 changed files with 711 additions and 0 deletions

203
chunks/chunk.go Normal file
View File

@ -0,0 +1,203 @@
package chunks
import (
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"sync/atomic"
"github.com/prometheus/common/model"
)
// Encoding is the identifier for a chunk encoding
type Encoding uint8
func (e Encoding) String() string {
switch e {
case EncNone:
return "none"
case EncPlain:
return "plain"
case EncXOR:
return "XOR"
case EncDoubleDelta:
return "doubleDelta"
}
return "<unknown>"
}
// The different available chunk encodings.
const (
EncNone Encoding = iota
EncPlain
EncXOR
EncDoubleDelta
)
var (
// ErrChunkFull is returned if the remaining size of a chunk cannot
// fit the appended data.
ErrChunkFull = errors.New("chunk full")
)
// Chunk holds a sequence of sample pairs that can be iterated over and appended to.
type Chunk interface {
Data() []byte
Appender() Appender
Iterator() Iterator
}
// FromData returns a chunk from a byte slice of chunk data.
func FromData(d []byte) (Chunk, error) {
if len(d) == 0 {
return nil, fmt.Errorf("no data")
}
e := Encoding(d[0])
switch e {
case EncPlain:
rc := rawChunk{d: d, l: uint64(len(d))}
return &PlainChunk{rawChunk: rc}, nil
}
return nil, fmt.Errorf("unknown chunk encoding: %d", e)
}
// Iterator provides iterating access over sample pairs in a chunk.
type Iterator interface {
// Seek moves the iterator to the element at or after the given time
// and returns the sample pair at the position.
Seek(model.Time) (model.SamplePair, bool)
// Next returns the next sample pair in the iterator.
Next() (model.SamplePair, bool)
// SeekBefore(model.Time) (model.SamplePair, bool)
First() (model.SamplePair, bool)
// Last() (model.SamplePair, bool)
// Err returns a non-nil error if Next or Seek returned false.
// Their behavior on subsequent calls after one of them returned false
// is undefined.
Err() error
}
// Appender adds sample pairs to a chunk.
type Appender interface {
Append(model.Time, model.SampleValue) error
}
// rawChunk provides a basic byte slice and is used by higher-level
// Chunk implementations. It can be safely appended to without causing
// any further allocations.
type rawChunk struct {
d []byte
l uint64
}
func newRawChunk(sz int, enc Encoding) rawChunk {
c := rawChunk{d: make([]byte, sz), l: 1}
c.d[0] = byte(enc)
return c
}
func (c *rawChunk) encoding() Encoding {
return Encoding(c.d[0])
}
func (c *rawChunk) Data() []byte {
return c.d[:c.l]
}
func (c *rawChunk) append(b []byte) error {
if len(b) > len(c.d)-int(c.l) {
return ErrChunkFull
}
copy(c.d[c.l:], b)
// Atomically increment the length so we can safely retrieve iterators
// for a chunk that is being appended to.
// This does not make it safe for concurrent appends!
atomic.AddUint64(&c.l, uint64(len(b)))
return nil
}
// PlainChunk implements a Chunk using simple 16 byte representations
// of sample pairs.
type PlainChunk struct {
rawChunk
}
// NewPlainChunk returns a new chunk using EncPlain.
func NewPlainChunk(sz int) *PlainChunk {
return &PlainChunk{rawChunk: newRawChunk(sz, EncPlain)}
}
// Iterator implements the Chunk interface.
func (c *PlainChunk) Iterator() Iterator {
return &plainChunkIterator{c: c.d[1:c.l]}
}
// Appender implements the Chunk interface.
func (c *PlainChunk) Appender() Appender {
return &plainChunkAppender{c: &c.rawChunk}
}
type plainChunkAppender struct {
c *rawChunk
}
// Append implements the Appender interface,
func (a *plainChunkAppender) Append(ts model.Time, v model.SampleValue) error {
b := make([]byte, 16)
binary.LittleEndian.PutUint64(b, uint64(ts))
binary.LittleEndian.PutUint64(b[8:], math.Float64bits(float64(v)))
return a.c.append(b)
}
type plainChunkIterator struct {
c []byte // chunk data
pos int // position of last emitted element
err error // last error
}
func (it *plainChunkIterator) Err() error {
return it.err
}
func (it *plainChunkIterator) timeAt(pos int) model.Time {
return model.Time(binary.LittleEndian.Uint64(it.c[pos:]))
}
func (it *plainChunkIterator) valueAt(pos int) model.SampleValue {
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[pos:])))
}
func (it *plainChunkIterator) First() (model.SamplePair, bool) {
it.pos = 0
return it.Next()
}
func (it *plainChunkIterator) Seek(ts model.Time) (model.SamplePair, bool) {
for it.pos = 0; it.pos < len(it.c); it.pos += 16 {
if t := it.timeAt(it.pos); t >= ts {
return model.SamplePair{
Timestamp: t,
Value: it.valueAt(it.pos + 8),
}, true
}
}
it.err = io.EOF
return model.SamplePair{}, false
}
func (it *plainChunkIterator) Next() (model.SamplePair, bool) {
it.pos += 16
if it.pos >= len(it.c) {
it.err = io.EOF
return model.SamplePair{}, false
}
return model.SamplePair{
Timestamp: it.timeAt(it.pos),
Value: it.valueAt(it.pos + 8),
}, true
}

181
chunks/chunk_test.go Normal file
View File

@ -0,0 +1,181 @@
package chunks
import (
"encoding/binary"
"fmt"
"io"
"math"
"math/rand"
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func TestRawChunkAppend(t *testing.T) {
c := newRawChunk(1, 'a')
require.NoError(t, c.append(nil))
require.Error(t, c.append([]byte("t")))
c = newRawChunk(5, 'a')
require.NoError(t, c.append([]byte("test")))
require.Error(t, c.append([]byte("x")))
require.Equal(t, rawChunk{d: []byte("atest"), l: 5}, c)
require.Equal(t, []byte("atest"), c.Data())
}
func TestPlainAppender(t *testing.T) {
c := NewPlainChunk(3*16 + 1)
a := c.Appender()
require.NoError(t, a.Append(1, 1))
require.NoError(t, a.Append(2, 2))
require.NoError(t, a.Append(3, 3))
require.Equal(t, ErrChunkFull, a.Append(4, 4))
exp := []byte{byte(EncPlain)}
b := make([]byte, 8)
for i := 1; i <= 3; i++ {
binary.LittleEndian.PutUint64(b, uint64(i))
exp = append(exp, b...)
binary.LittleEndian.PutUint64(b, math.Float64bits(float64(i)))
exp = append(exp, b...)
}
require.Equal(t, exp, c.Data())
}
func TestPlainIterator(t *testing.T) {
c := NewPlainChunk(100*16 + 1)
a := c.Appender()
var exp []model.SamplePair
for i := 0; i < 100; i++ {
exp = append(exp, model.SamplePair{
Timestamp: model.Time(i * 2),
Value: model.SampleValue(i * 2),
})
require.NoError(t, a.Append(model.Time(i*2), model.SampleValue(i*2)))
}
it := c.Iterator()
var res1 []model.SamplePair
for s, ok := it.Seek(0); ok; s, ok = it.Next() {
res1 = append(res1, s)
}
require.Equal(t, io.EOF, it.Err())
require.Equal(t, exp, res1)
var res2 []model.SamplePair
for s, ok := it.Seek(11); ok; s, ok = it.Next() {
res2 = append(res2, s)
}
require.Equal(t, io.EOF, it.Err())
require.Equal(t, exp[6:], res2)
}
func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) {
var (
baseT = model.Now()
baseV = 1243535
)
var exp []model.SamplePair
for i := 0; i < b.N; i++ {
baseT += model.Time(rand.Intn(10000))
baseV += rand.Intn(10000)
exp = append(exp, model.SamplePair{
Timestamp: baseT,
Value: model.SampleValue(baseV),
})
}
var chunks []Chunk
for i := 0; i < b.N; {
c := newChunk(1024)
a := c.Appender()
for i < b.N {
if err := a.Append(exp[i].Timestamp, exp[i].Value); err == ErrChunkFull {
break
} else if err != nil {
b.Fatal(err)
}
i++
}
chunks = append(chunks, c)
}
b.ReportAllocs()
b.ResetTimer()
res := make([]model.SamplePair, 0, 1024)
for i := 0; i < len(chunks); i++ {
c := chunks[i]
it := c.Iterator()
for s, ok := it.First(); ok; s, ok = it.Next() {
res = append(res, s)
}
if it.Err() != io.EOF {
b.Fatal(it.Err())
}
res = res[:0]
}
}
func BenchmarkPlainIterator(b *testing.B) {
benchmarkIterator(b, func(sz int) Chunk {
return NewPlainChunk(sz)
})
}
func BenchmarkDoubleDeltaIterator(b *testing.B) {
benchmarkIterator(b, func(sz int) Chunk {
return NewDoubleDeltaChunk(sz)
})
}
func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) {
var (
baseT = model.Now()
baseV = 1243535
)
var exp []model.SamplePair
for i := 0; i < b.N; i++ {
baseT += model.Time(rand.Intn(10000))
baseV += rand.Intn(10000)
exp = append(exp, model.SamplePair{
Timestamp: baseT,
Value: model.SampleValue(baseV),
})
}
b.ReportAllocs()
b.ResetTimer()
var chunks []Chunk
for i := 0; i < b.N; {
c := newChunk(1024)
a := c.Appender()
for i < b.N {
if err := a.Append(exp[i].Timestamp, exp[i].Value); err == ErrChunkFull {
break
} else if err != nil {
b.Fatal(err)
}
i++
}
chunks = append(chunks, c)
}
fmt.Println("created chunks", len(chunks))
}
func BenchmarkPlainAppender(b *testing.B) {
benchmarkAppender(b, func(sz int) Chunk {
return NewPlainChunk(sz)
})
}
func BenchmarkDoubleDeltaAppender(b *testing.B) {
benchmarkAppender(b, func(sz int) Chunk {
return NewDoubleDeltaChunk(sz)
})
}

192
chunks/doubledelta.go Normal file
View File

@ -0,0 +1,192 @@
package chunks
import (
"encoding/binary"
"errors"
"io"
"math"
"github.com/prometheus/common/model"
)
// DoubleDeltaChunk stores delta-delta encoded sample data.
type DoubleDeltaChunk struct {
rawChunk
}
// NewDoubleDeltaChunk returns a new chunk using double delta encoding.
func NewDoubleDeltaChunk(sz int) *DoubleDeltaChunk {
return &DoubleDeltaChunk{rawChunk: newRawChunk(sz, EncDoubleDelta)}
}
// Iterator implements the Chunk interface.
func (c *DoubleDeltaChunk) Iterator() Iterator {
return &doubleDeltaIterator{d: c.d[1:c.l]}
}
// Appender implements the Chunk interface.
func (c *DoubleDeltaChunk) Appender() Appender {
return &doubleDeltaAppender{c: &c.rawChunk}
}
type doubleDeltaIterator struct {
d []byte
err error
pos, num int
curT, curV int64
nextT, nextV int64
deltaV int64
deltaT uint64
}
func (it *doubleDeltaIterator) Err() error {
return it.err
}
func (it *doubleDeltaIterator) readPair() bool {
if len(it.d) == it.pos {
return false
}
var (
n, m int
ddv int64
ddt uint64
)
it.curT = it.nextT
it.curV = it.nextV
if it.num > 1 {
ddt, n = binary.Uvarint(it.d[it.pos:])
ddv, m = binary.Varint(it.d[it.pos+n:])
it.deltaT += ddt
it.deltaV += ddv
it.nextT += int64(it.deltaT)
it.nextV += it.deltaV
} else if it.num == 1 {
it.deltaT, n = binary.Uvarint(it.d[it.pos:])
it.deltaV, m = binary.Varint(it.d[it.pos+n:])
it.nextT += int64(it.deltaT)
it.nextV += it.deltaV
} else {
it.nextT, n = binary.Varint(it.d[it.pos:])
it.nextV, m = binary.Varint(it.d[it.pos+n:])
}
it.pos += n + m
it.num++
return true
}
func (it *doubleDeltaIterator) First() (model.SamplePair, bool) {
it.pos = 0
it.num = 0
if !it.readPair() {
it.err = io.EOF
return model.SamplePair{}, false
}
return it.Next()
}
func (it *doubleDeltaIterator) Seek(ts model.Time) (model.SamplePair, bool) {
if int64(ts) < it.nextT {
it.pos = 0
it.num = 0
if !it.readPair() {
it.err = io.EOF
return model.SamplePair{}, false
}
if _, ok := it.Next(); !ok {
return model.SamplePair{}, false
}
}
for {
if it.nextT > int64(ts) {
if it.num < 2 {
it.err = io.EOF
return model.SamplePair{}, false
}
return model.SamplePair{
Timestamp: model.Time(it.curT),
Value: model.SampleValue(it.curV),
}, true
}
if _, ok := it.Next(); !ok {
return model.SamplePair{}, false
}
}
}
func (it *doubleDeltaIterator) Next() (model.SamplePair, bool) {
if it.err == io.EOF {
return model.SamplePair{}, false
}
res := model.SamplePair{
Timestamp: model.Time(it.nextT),
Value: model.SampleValue(it.nextV),
}
if !it.readPair() {
it.err = io.EOF
}
return res, true
}
type doubleDeltaAppender struct {
c *rawChunk
buf [16]byte
num int // stored values so far.
lastV, lastVDelta int64
lastT int64
lastTDelta uint64
}
func isInt(f model.SampleValue) (int64, bool) {
x, frac := math.Modf(float64(f))
if frac != 0 {
return 0, false
}
return int64(x), true
}
// ErrNoInteger is returned if a non-integer is appended to
// a double delta chunk.
var ErrNoInteger = errors.New("not an integer")
func (a *doubleDeltaAppender) Append(ts model.Time, fv model.SampleValue) error {
v, ok := isInt(fv)
if !ok {
return ErrNoInteger
}
if a.num == 0 {
n := binary.PutVarint(a.buf[:], int64(ts))
n += binary.PutVarint(a.buf[n:], v)
if err := a.c.append(a.buf[:n]); err != nil {
return err
}
a.lastT, a.lastV = int64(ts), v
a.num++
return nil
}
if a.num == 1 {
a.lastTDelta, a.lastVDelta = uint64(int64(ts)-a.lastT), v-a.lastV
n := binary.PutUvarint(a.buf[:], a.lastTDelta)
n += binary.PutVarint(a.buf[n:], a.lastVDelta)
if err := a.c.append(a.buf[:n]); err != nil {
return err
}
} else {
predT, predV := a.lastT+int64(a.lastTDelta), a.lastV+a.lastVDelta
tdd := uint64(int64(ts) - predT)
vdd := v - predV
n := binary.PutUvarint(a.buf[:], tdd)
n += binary.PutVarint(a.buf[n:], vdd)
if err := a.c.append(a.buf[:n]); err != nil {
return err
}
a.lastTDelta += tdd
a.lastVDelta += vdd
}
a.lastT, a.lastV = int64(ts), v
a.num++
return nil
}

View File

@ -0,0 +1,58 @@
package chunks
import (
"io"
"math/rand"
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func testDoubleDeltaChunk(t *testing.T) {
ts := model.Time(14345645)
v := int64(123123)
var input []model.SamplePair
for i := 0; i < 2000; i++ {
ts += model.Time(rand.Int63n(100) + 1)
v += rand.Int63n(1000)
if rand.Int() > 0 {
v *= -1
}
input = append(input, model.SamplePair{
Timestamp: ts,
Value: model.SampleValue(v),
})
}
c := NewDoubleDeltaChunk(rand.Intn(3000))
app := c.Appender()
for i, s := range input {
err := app.Append(s.Timestamp, s.Value)
if err == ErrChunkFull {
input = input[:i]
break
}
require.NoError(t, err, "at sample %d: %v", i, s)
}
result := []model.SamplePair{}
it := c.Iterator()
for s, ok := it.First(); ok; s, ok = it.Next() {
result = append(result, s)
}
if it.Err() != io.EOF {
require.NoError(t, it.Err())
}
require.Equal(t, input, result)
}
func TestDoubleDeltaChunk(t *testing.T) {
for i := 0; i < 10000; i++ {
testDoubleDeltaChunk(t)
}
}

77
chunks/xor.go Normal file
View File

@ -0,0 +1,77 @@
package chunks
import (
"encoding/binary"
"math"
"github.com/prometheus/common/model"
)
// XORChunk holds XOR encoded sample data.
type XORChunk struct {
rawChunk
}
// NewXORChunk returns a new chunk with XOR encoding of the given size.
func NewXORChunk(sz int) *XORChunk {
return &XORChunk{rawChunk: newRawChunk(sz, EncXOR)}
}
// Appender implements the Chunk interface.
func (c *XORChunk) Appender() Appender {
return &xorAppender{c: &c.rawChunk}
}
// Iterator implements the Chunk interface.
func (c *XORChunk) Iterator() Iterator {
return &xorIterator{d: c.d[1:c.l]}
}
type xorAppender struct {
c *rawChunk
num int
buf [16]byte
lastV float64
lastT int64
lastTDelta uint64
}
func (a *xorAppender) Append(ts model.Time, v model.SampleValue) error {
if a.num == 0 {
n := binary.PutVarint(a.buf[:], int64(ts))
binary.BigEndian.PutUint64(a.buf[n:], math.Float64bits(float64(v)))
if err := a.c.append(a.buf[:n+8]); err != nil {
return err
}
a.lastT, a.lastV = int64(ts), float64(v)
a.num++
return nil
}
if a.num == 1 {
a.lastTDelta = uint64(int64(ts) - a.lastT)
}
a.num++
return nil
}
type xorIterator struct {
d []byte
}
func (it *xorIterator) First() (model.SamplePair, bool) {
return model.SamplePair{}, false
}
func (it *xorIterator) Seek(ts model.Time) (model.SamplePair, bool) {
return model.SamplePair{}, false
}
func (it *xorIterator) Next() (model.SamplePair, bool) {
return model.SamplePair{}, false
}
func (it *xorIterator) Err() error {
return nil
}