From ae7b18ae3473250c29901fa7c3fe149a32b59465 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 15 Nov 2016 10:33:34 +0100 Subject: [PATCH] Add chunk package --- chunks/chunk.go | 203 +++++++++++++++++++++++++++++++++++++ chunks/chunk_test.go | 181 +++++++++++++++++++++++++++++++++ chunks/doubledelta.go | 192 +++++++++++++++++++++++++++++++++++ chunks/doubledelta_test.go | 58 +++++++++++ chunks/xor.go | 77 ++++++++++++++ 5 files changed, 711 insertions(+) create mode 100644 chunks/chunk.go create mode 100644 chunks/chunk_test.go create mode 100644 chunks/doubledelta.go create mode 100644 chunks/doubledelta_test.go create mode 100644 chunks/xor.go diff --git a/chunks/chunk.go b/chunks/chunk.go new file mode 100644 index 000000000..2a453d81b --- /dev/null +++ b/chunks/chunk.go @@ -0,0 +1,203 @@ +package chunks + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "math" + "sync/atomic" + + "github.com/prometheus/common/model" +) + +// Encoding is the identifier for a chunk encoding +type Encoding uint8 + +func (e Encoding) String() string { + switch e { + case EncNone: + return "none" + case EncPlain: + return "plain" + case EncXOR: + return "XOR" + case EncDoubleDelta: + return "doubleDelta" + } + return "" +} + +// The different available chunk encodings. +const ( + EncNone Encoding = iota + EncPlain + EncXOR + EncDoubleDelta +) + +var ( + // ErrChunkFull is returned if the remaining size of a chunk cannot + // fit the appended data. + ErrChunkFull = errors.New("chunk full") +) + +// Chunk holds a sequence of sample pairs that can be iterated over and appended to. +type Chunk interface { + Data() []byte + Appender() Appender + Iterator() Iterator +} + +// FromData returns a chunk from a byte slice of chunk data. +func FromData(d []byte) (Chunk, error) { + if len(d) == 0 { + return nil, fmt.Errorf("no data") + } + e := Encoding(d[0]) + + switch e { + case EncPlain: + rc := rawChunk{d: d, l: uint64(len(d))} + return &PlainChunk{rawChunk: rc}, nil + } + return nil, fmt.Errorf("unknown chunk encoding: %d", e) +} + +// Iterator provides iterating access over sample pairs in a chunk. +type Iterator interface { + // Seek moves the iterator to the element at or after the given time + // and returns the sample pair at the position. + Seek(model.Time) (model.SamplePair, bool) + // Next returns the next sample pair in the iterator. + Next() (model.SamplePair, bool) + + // SeekBefore(model.Time) (model.SamplePair, bool) + First() (model.SamplePair, bool) + // Last() (model.SamplePair, bool) + + // Err returns a non-nil error if Next or Seek returned false. + // Their behavior on subsequent calls after one of them returned false + // is undefined. + Err() error +} + +// Appender adds sample pairs to a chunk. +type Appender interface { + Append(model.Time, model.SampleValue) error +} + +// rawChunk provides a basic byte slice and is used by higher-level +// Chunk implementations. It can be safely appended to without causing +// any further allocations. +type rawChunk struct { + d []byte + l uint64 +} + +func newRawChunk(sz int, enc Encoding) rawChunk { + c := rawChunk{d: make([]byte, sz), l: 1} + c.d[0] = byte(enc) + return c +} + +func (c *rawChunk) encoding() Encoding { + return Encoding(c.d[0]) +} + +func (c *rawChunk) Data() []byte { + return c.d[:c.l] +} + +func (c *rawChunk) append(b []byte) error { + if len(b) > len(c.d)-int(c.l) { + return ErrChunkFull + } + copy(c.d[c.l:], b) + // Atomically increment the length so we can safely retrieve iterators + // for a chunk that is being appended to. + // This does not make it safe for concurrent appends! + atomic.AddUint64(&c.l, uint64(len(b))) + return nil +} + +// PlainChunk implements a Chunk using simple 16 byte representations +// of sample pairs. +type PlainChunk struct { + rawChunk +} + +// NewPlainChunk returns a new chunk using EncPlain. +func NewPlainChunk(sz int) *PlainChunk { + return &PlainChunk{rawChunk: newRawChunk(sz, EncPlain)} +} + +// Iterator implements the Chunk interface. +func (c *PlainChunk) Iterator() Iterator { + return &plainChunkIterator{c: c.d[1:c.l]} +} + +// Appender implements the Chunk interface. +func (c *PlainChunk) Appender() Appender { + return &plainChunkAppender{c: &c.rawChunk} +} + +type plainChunkAppender struct { + c *rawChunk +} + +// Append implements the Appender interface, +func (a *plainChunkAppender) Append(ts model.Time, v model.SampleValue) error { + b := make([]byte, 16) + binary.LittleEndian.PutUint64(b, uint64(ts)) + binary.LittleEndian.PutUint64(b[8:], math.Float64bits(float64(v))) + return a.c.append(b) +} + +type plainChunkIterator struct { + c []byte // chunk data + pos int // position of last emitted element + err error // last error +} + +func (it *plainChunkIterator) Err() error { + return it.err +} + +func (it *plainChunkIterator) timeAt(pos int) model.Time { + return model.Time(binary.LittleEndian.Uint64(it.c[pos:])) +} + +func (it *plainChunkIterator) valueAt(pos int) model.SampleValue { + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[pos:]))) +} + +func (it *plainChunkIterator) First() (model.SamplePair, bool) { + it.pos = 0 + return it.Next() +} + +func (it *plainChunkIterator) Seek(ts model.Time) (model.SamplePair, bool) { + for it.pos = 0; it.pos < len(it.c); it.pos += 16 { + if t := it.timeAt(it.pos); t >= ts { + return model.SamplePair{ + Timestamp: t, + Value: it.valueAt(it.pos + 8), + }, true + } + } + it.err = io.EOF + return model.SamplePair{}, false +} + +func (it *plainChunkIterator) Next() (model.SamplePair, bool) { + it.pos += 16 + if it.pos >= len(it.c) { + it.err = io.EOF + return model.SamplePair{}, false + } + return model.SamplePair{ + Timestamp: it.timeAt(it.pos), + Value: it.valueAt(it.pos + 8), + }, true +} diff --git a/chunks/chunk_test.go b/chunks/chunk_test.go new file mode 100644 index 000000000..b351390de --- /dev/null +++ b/chunks/chunk_test.go @@ -0,0 +1,181 @@ +package chunks + +import ( + "encoding/binary" + "fmt" + "io" + "math" + "math/rand" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func TestRawChunkAppend(t *testing.T) { + c := newRawChunk(1, 'a') + require.NoError(t, c.append(nil)) + require.Error(t, c.append([]byte("t"))) + + c = newRawChunk(5, 'a') + require.NoError(t, c.append([]byte("test"))) + require.Error(t, c.append([]byte("x"))) + require.Equal(t, rawChunk{d: []byte("atest"), l: 5}, c) + require.Equal(t, []byte("atest"), c.Data()) +} + +func TestPlainAppender(t *testing.T) { + c := NewPlainChunk(3*16 + 1) + a := c.Appender() + + require.NoError(t, a.Append(1, 1)) + require.NoError(t, a.Append(2, 2)) + require.NoError(t, a.Append(3, 3)) + require.Equal(t, ErrChunkFull, a.Append(4, 4)) + + exp := []byte{byte(EncPlain)} + b := make([]byte, 8) + for i := 1; i <= 3; i++ { + binary.LittleEndian.PutUint64(b, uint64(i)) + exp = append(exp, b...) + binary.LittleEndian.PutUint64(b, math.Float64bits(float64(i))) + exp = append(exp, b...) + } + require.Equal(t, exp, c.Data()) +} + +func TestPlainIterator(t *testing.T) { + c := NewPlainChunk(100*16 + 1) + a := c.Appender() + + var exp []model.SamplePair + for i := 0; i < 100; i++ { + exp = append(exp, model.SamplePair{ + Timestamp: model.Time(i * 2), + Value: model.SampleValue(i * 2), + }) + require.NoError(t, a.Append(model.Time(i*2), model.SampleValue(i*2))) + } + + it := c.Iterator() + + var res1 []model.SamplePair + for s, ok := it.Seek(0); ok; s, ok = it.Next() { + res1 = append(res1, s) + } + require.Equal(t, io.EOF, it.Err()) + require.Equal(t, exp, res1) + + var res2 []model.SamplePair + for s, ok := it.Seek(11); ok; s, ok = it.Next() { + res2 = append(res2, s) + } + require.Equal(t, io.EOF, it.Err()) + require.Equal(t, exp[6:], res2) +} + +func benchmarkIterator(b *testing.B, newChunk func(int) Chunk) { + var ( + baseT = model.Now() + baseV = 1243535 + ) + var exp []model.SamplePair + for i := 0; i < b.N; i++ { + baseT += model.Time(rand.Intn(10000)) + baseV += rand.Intn(10000) + exp = append(exp, model.SamplePair{ + Timestamp: baseT, + Value: model.SampleValue(baseV), + }) + } + var chunks []Chunk + for i := 0; i < b.N; { + c := newChunk(1024) + a := c.Appender() + for i < b.N { + if err := a.Append(exp[i].Timestamp, exp[i].Value); err == ErrChunkFull { + break + } else if err != nil { + b.Fatal(err) + } + i++ + } + chunks = append(chunks, c) + } + + b.ReportAllocs() + b.ResetTimer() + + res := make([]model.SamplePair, 0, 1024) + for i := 0; i < len(chunks); i++ { + c := chunks[i] + it := c.Iterator() + + for s, ok := it.First(); ok; s, ok = it.Next() { + res = append(res, s) + } + if it.Err() != io.EOF { + b.Fatal(it.Err()) + } + res = res[:0] + } +} + +func BenchmarkPlainIterator(b *testing.B) { + benchmarkIterator(b, func(sz int) Chunk { + return NewPlainChunk(sz) + }) +} + +func BenchmarkDoubleDeltaIterator(b *testing.B) { + benchmarkIterator(b, func(sz int) Chunk { + return NewDoubleDeltaChunk(sz) + }) +} + +func benchmarkAppender(b *testing.B, newChunk func(int) Chunk) { + var ( + baseT = model.Now() + baseV = 1243535 + ) + var exp []model.SamplePair + for i := 0; i < b.N; i++ { + baseT += model.Time(rand.Intn(10000)) + baseV += rand.Intn(10000) + exp = append(exp, model.SamplePair{ + Timestamp: baseT, + Value: model.SampleValue(baseV), + }) + } + + b.ReportAllocs() + b.ResetTimer() + + var chunks []Chunk + for i := 0; i < b.N; { + c := newChunk(1024) + a := c.Appender() + for i < b.N { + if err := a.Append(exp[i].Timestamp, exp[i].Value); err == ErrChunkFull { + break + } else if err != nil { + b.Fatal(err) + } + i++ + } + chunks = append(chunks, c) + } + fmt.Println("created chunks", len(chunks)) +} + +func BenchmarkPlainAppender(b *testing.B) { + benchmarkAppender(b, func(sz int) Chunk { + return NewPlainChunk(sz) + }) +} + +func BenchmarkDoubleDeltaAppender(b *testing.B) { + benchmarkAppender(b, func(sz int) Chunk { + return NewDoubleDeltaChunk(sz) + }) +} diff --git a/chunks/doubledelta.go b/chunks/doubledelta.go new file mode 100644 index 000000000..0ac4655e3 --- /dev/null +++ b/chunks/doubledelta.go @@ -0,0 +1,192 @@ +package chunks + +import ( + "encoding/binary" + "errors" + "io" + "math" + + "github.com/prometheus/common/model" +) + +// DoubleDeltaChunk stores delta-delta encoded sample data. +type DoubleDeltaChunk struct { + rawChunk +} + +// NewDoubleDeltaChunk returns a new chunk using double delta encoding. +func NewDoubleDeltaChunk(sz int) *DoubleDeltaChunk { + return &DoubleDeltaChunk{rawChunk: newRawChunk(sz, EncDoubleDelta)} +} + +// Iterator implements the Chunk interface. +func (c *DoubleDeltaChunk) Iterator() Iterator { + return &doubleDeltaIterator{d: c.d[1:c.l]} +} + +// Appender implements the Chunk interface. +func (c *DoubleDeltaChunk) Appender() Appender { + return &doubleDeltaAppender{c: &c.rawChunk} +} + +type doubleDeltaIterator struct { + d []byte + + err error + pos, num int + curT, curV int64 + nextT, nextV int64 + deltaV int64 + deltaT uint64 +} + +func (it *doubleDeltaIterator) Err() error { + return it.err +} + +func (it *doubleDeltaIterator) readPair() bool { + if len(it.d) == it.pos { + return false + } + var ( + n, m int + ddv int64 + ddt uint64 + ) + it.curT = it.nextT + it.curV = it.nextV + + if it.num > 1 { + ddt, n = binary.Uvarint(it.d[it.pos:]) + ddv, m = binary.Varint(it.d[it.pos+n:]) + it.deltaT += ddt + it.deltaV += ddv + it.nextT += int64(it.deltaT) + it.nextV += it.deltaV + } else if it.num == 1 { + it.deltaT, n = binary.Uvarint(it.d[it.pos:]) + it.deltaV, m = binary.Varint(it.d[it.pos+n:]) + it.nextT += int64(it.deltaT) + it.nextV += it.deltaV + } else { + it.nextT, n = binary.Varint(it.d[it.pos:]) + it.nextV, m = binary.Varint(it.d[it.pos+n:]) + } + it.pos += n + m + it.num++ + return true +} + +func (it *doubleDeltaIterator) First() (model.SamplePair, bool) { + it.pos = 0 + it.num = 0 + if !it.readPair() { + it.err = io.EOF + return model.SamplePair{}, false + } + return it.Next() +} + +func (it *doubleDeltaIterator) Seek(ts model.Time) (model.SamplePair, bool) { + if int64(ts) < it.nextT { + it.pos = 0 + it.num = 0 + if !it.readPair() { + it.err = io.EOF + return model.SamplePair{}, false + } + if _, ok := it.Next(); !ok { + return model.SamplePair{}, false + } + } + for { + if it.nextT > int64(ts) { + if it.num < 2 { + it.err = io.EOF + return model.SamplePair{}, false + } + return model.SamplePair{ + Timestamp: model.Time(it.curT), + Value: model.SampleValue(it.curV), + }, true + } + if _, ok := it.Next(); !ok { + return model.SamplePair{}, false + } + } +} + +func (it *doubleDeltaIterator) Next() (model.SamplePair, bool) { + if it.err == io.EOF { + return model.SamplePair{}, false + } + res := model.SamplePair{ + Timestamp: model.Time(it.nextT), + Value: model.SampleValue(it.nextV), + } + if !it.readPair() { + it.err = io.EOF + } + return res, true +} + +type doubleDeltaAppender struct { + c *rawChunk + buf [16]byte + num int // stored values so far. + + lastV, lastVDelta int64 + lastT int64 + lastTDelta uint64 +} + +func isInt(f model.SampleValue) (int64, bool) { + x, frac := math.Modf(float64(f)) + if frac != 0 { + return 0, false + } + return int64(x), true +} + +// ErrNoInteger is returned if a non-integer is appended to +// a double delta chunk. +var ErrNoInteger = errors.New("not an integer") + +func (a *doubleDeltaAppender) Append(ts model.Time, fv model.SampleValue) error { + v, ok := isInt(fv) + if !ok { + return ErrNoInteger + } + if a.num == 0 { + n := binary.PutVarint(a.buf[:], int64(ts)) + n += binary.PutVarint(a.buf[n:], v) + if err := a.c.append(a.buf[:n]); err != nil { + return err + } + a.lastT, a.lastV = int64(ts), v + a.num++ + return nil + } + if a.num == 1 { + a.lastTDelta, a.lastVDelta = uint64(int64(ts)-a.lastT), v-a.lastV + n := binary.PutUvarint(a.buf[:], a.lastTDelta) + n += binary.PutVarint(a.buf[n:], a.lastVDelta) + if err := a.c.append(a.buf[:n]); err != nil { + return err + } + } else { + predT, predV := a.lastT+int64(a.lastTDelta), a.lastV+a.lastVDelta + tdd := uint64(int64(ts) - predT) + vdd := v - predV + n := binary.PutUvarint(a.buf[:], tdd) + n += binary.PutVarint(a.buf[n:], vdd) + if err := a.c.append(a.buf[:n]); err != nil { + return err + } + a.lastTDelta += tdd + a.lastVDelta += vdd + } + a.lastT, a.lastV = int64(ts), v + a.num++ + return nil +} diff --git a/chunks/doubledelta_test.go b/chunks/doubledelta_test.go new file mode 100644 index 000000000..d62725b97 --- /dev/null +++ b/chunks/doubledelta_test.go @@ -0,0 +1,58 @@ +package chunks + +import ( + "io" + "math/rand" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func testDoubleDeltaChunk(t *testing.T) { + ts := model.Time(14345645) + v := int64(123123) + + var input []model.SamplePair + for i := 0; i < 2000; i++ { + ts += model.Time(rand.Int63n(100) + 1) + v += rand.Int63n(1000) + if rand.Int() > 0 { + v *= -1 + } + + input = append(input, model.SamplePair{ + Timestamp: ts, + Value: model.SampleValue(v), + }) + } + + c := NewDoubleDeltaChunk(rand.Intn(3000)) + + app := c.Appender() + for i, s := range input { + err := app.Append(s.Timestamp, s.Value) + if err == ErrChunkFull { + input = input[:i] + break + } + require.NoError(t, err, "at sample %d: %v", i, s) + } + + result := []model.SamplePair{} + + it := c.Iterator() + for s, ok := it.First(); ok; s, ok = it.Next() { + result = append(result, s) + } + if it.Err() != io.EOF { + require.NoError(t, it.Err()) + } + require.Equal(t, input, result) +} + +func TestDoubleDeltaChunk(t *testing.T) { + for i := 0; i < 10000; i++ { + testDoubleDeltaChunk(t) + } +} diff --git a/chunks/xor.go b/chunks/xor.go new file mode 100644 index 000000000..13f7b7ca4 --- /dev/null +++ b/chunks/xor.go @@ -0,0 +1,77 @@ +package chunks + +import ( + "encoding/binary" + "math" + + "github.com/prometheus/common/model" +) + +// XORChunk holds XOR encoded sample data. +type XORChunk struct { + rawChunk +} + +// NewXORChunk returns a new chunk with XOR encoding of the given size. +func NewXORChunk(sz int) *XORChunk { + return &XORChunk{rawChunk: newRawChunk(sz, EncXOR)} +} + +// Appender implements the Chunk interface. +func (c *XORChunk) Appender() Appender { + return &xorAppender{c: &c.rawChunk} +} + +// Iterator implements the Chunk interface. +func (c *XORChunk) Iterator() Iterator { + return &xorIterator{d: c.d[1:c.l]} +} + +type xorAppender struct { + c *rawChunk + num int + buf [16]byte + + lastV float64 + lastT int64 + lastTDelta uint64 +} + +func (a *xorAppender) Append(ts model.Time, v model.SampleValue) error { + if a.num == 0 { + n := binary.PutVarint(a.buf[:], int64(ts)) + binary.BigEndian.PutUint64(a.buf[n:], math.Float64bits(float64(v))) + if err := a.c.append(a.buf[:n+8]); err != nil { + return err + } + a.lastT, a.lastV = int64(ts), float64(v) + a.num++ + return nil + } + if a.num == 1 { + a.lastTDelta = uint64(int64(ts) - a.lastT) + } + + a.num++ + return nil +} + +type xorIterator struct { + d []byte +} + +func (it *xorIterator) First() (model.SamplePair, bool) { + return model.SamplePair{}, false +} + +func (it *xorIterator) Seek(ts model.Time) (model.SamplePair, bool) { + return model.SamplePair{}, false +} + +func (it *xorIterator) Next() (model.SamplePair, bool) { + return model.SamplePair{}, false +} + +func (it *xorIterator) Err() error { + return nil +}