Merge pull request #9509 from prometheus/beorn7/sparsehistogram

Histogram encoding improvementes
This commit is contained in:
Björn Rabenstein 2021-10-18 15:45:20 +02:00 committed by GitHub
commit 8c1507ebaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 373 additions and 122 deletions

View File

@ -153,10 +153,8 @@ func (c *HistogramChunk) Appender() (Appender, error) {
sum: it.sum,
leading: it.leading,
trailing: it.trailing,
buf64: make([]byte, binary.MaxVarintLen64),
}
if binary.BigEndian.Uint16(a.b.bytes()) == 0 {
if it.numTotal == 0 {
a.leading = 0xff
}
return a, nil
@ -222,20 +220,6 @@ type HistogramAppender struct {
sum float64
leading uint8
trailing uint8
buf64 []byte // For working on varint64's.
}
func putVarint(b *bstream, buf []byte, x int64) {
for _, byt := range buf[:binary.PutVarint(buf, x)] {
b.writeByte(byt)
}
}
func putUvarint(b *bstream, buf []byte, x uint64) {
for _, byt := range buf[:binary.PutUvarint(buf, x)] {
b.writeByte(byt)
}
}
// Append implements Appender. This implementation panics because normal float
@ -418,18 +402,21 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) {
a.nBucketsDelta = make([]int64, numNBuckets)
// Now store the actual data.
putVarint(a.b, a.buf64, t)
putUvarint(a.b, a.buf64, h.Count) // TODO(beorn7): Use putVarbitInt?
putUvarint(a.b, a.buf64, h.ZeroCount) // TODO(beorn7): Use putVarbitInt?
putVarbitInt(a.b, t)
putVarbitUint(a.b, h.Count)
putVarbitUint(a.b, h.ZeroCount) //
a.b.writeBits(math.Float64bits(h.Sum), 64)
for _, buck := range h.PositiveBuckets {
putVarint(a.b, a.buf64, buck) // TODO(beorn7): Use putVarbitInt?
for _, b := range h.PositiveBuckets {
putVarbitInt(a.b, b)
}
for _, buck := range h.NegativeBuckets {
putVarint(a.b, a.buf64, buck) // TODO(beorn7): Use putVarbitInt?
for _, b := range h.NegativeBuckets {
putVarbitInt(a.b, b)
}
case 1:
tDelta = t - a.t
if tDelta < 0 {
panic("out of order timestamp")
}
cntDelta = int64(h.Count) - int64(a.cnt)
zCntDelta = int64(h.ZeroCount) - int64(a.zCnt)
@ -437,20 +424,20 @@ func (a *HistogramAppender) AppendHistogram(t int64, h histogram.Histogram) {
cntDelta, zCntDelta = 0, 0
}
putVarint(a.b, a.buf64, tDelta) // TODO(beorn7): This should probably be putUvarint.
putVarint(a.b, a.buf64, cntDelta) // TODO(beorn7): Use putVarbitInt?
putVarint(a.b, a.buf64, zCntDelta) // TODO(beorn7): Use putVarbitInt?
putVarbitUint(a.b, uint64(tDelta))
putVarbitInt(a.b, cntDelta)
putVarbitInt(a.b, zCntDelta)
a.writeSumDelta(h.Sum)
for i, buck := range h.PositiveBuckets {
delta := buck - a.pBuckets[i]
putVarint(a.b, a.buf64, delta) // TODO(beorn7): Use putVarbitInt?
for i, b := range h.PositiveBuckets {
delta := b - a.pBuckets[i]
putVarbitInt(a.b, delta)
a.pBucketsDelta[i] = delta
}
for i, buck := range h.NegativeBuckets {
delta := buck - a.nBuckets[i]
putVarint(a.b, a.buf64, delta) // TODO(beorn7): Use putVarbitInt?
for i, b := range h.NegativeBuckets {
delta := b - a.nBuckets[i]
putVarbitInt(a.b, delta)
a.nBucketsDelta[i] = delta
}
@ -721,21 +708,21 @@ func (it *histogramIterator) Next() bool {
}
// Now read the actual data.
t, err := binary.ReadVarint(&it.br)
t, err := readVarbitInt(&it.br)
if err != nil {
it.err = err
return false
}
it.t = t
cnt, err := binary.ReadUvarint(&it.br)
cnt, err := readVarbitUint(&it.br)
if err != nil {
it.err = err
return false
}
it.cnt = cnt
zcnt, err := binary.ReadUvarint(&it.br)
zcnt, err := readVarbitUint(&it.br)
if err != nil {
it.err = err
return false
@ -750,7 +737,7 @@ func (it *histogramIterator) Next() bool {
it.sum = math.Float64frombits(sum)
for i := range it.pBuckets {
v, err := binary.ReadVarint(&it.br)
v, err := readVarbitInt(&it.br)
if err != nil {
it.err = err
return false
@ -758,7 +745,7 @@ func (it *histogramIterator) Next() bool {
it.pBuckets[i] = v
}
for i := range it.nBuckets {
v, err := binary.ReadVarint(&it.br)
v, err := readVarbitInt(&it.br)
if err != nil {
it.err = err
return false
@ -771,15 +758,15 @@ func (it *histogramIterator) Next() bool {
}
if it.numRead == 1 {
tDelta, err := binary.ReadVarint(&it.br)
tDelta, err := readVarbitUint(&it.br)
if err != nil {
it.err = err
return false
}
it.tDelta = tDelta
it.t += int64(it.tDelta)
it.tDelta = int64(tDelta)
it.t += it.tDelta
cntDelta, err := binary.ReadVarint(&it.br)
cntDelta, err := readVarbitInt(&it.br)
if err != nil {
it.err = err
return false
@ -787,7 +774,7 @@ func (it *histogramIterator) Next() bool {
it.cntDelta = cntDelta
it.cnt = uint64(int64(it.cnt) + it.cntDelta)
zcntDelta, err := binary.ReadVarint(&it.br)
zcntDelta, err := readVarbitInt(&it.br)
if err != nil {
it.err = err
return false
@ -806,7 +793,7 @@ func (it *histogramIterator) Next() bool {
}
for i := range it.pBuckets {
delta, err := binary.ReadVarint(&it.br)
delta, err := readVarbitInt(&it.br)
if err != nil {
it.err = err
return false
@ -816,7 +803,7 @@ func (it *histogramIterator) Next() bool {
}
for i := range it.nBuckets {
delta, err := binary.ReadVarint(&it.br)
delta, err := readVarbitInt(&it.br)
if err != nil {
it.err = err
return false

View File

@ -14,40 +14,34 @@
package chunkenc
import (
"math"
"github.com/prometheus/prometheus/model/histogram"
)
func writeHistogramChunkLayout(b *bstream, schema int32, zeroThreshold float64, positiveSpans, negativeSpans []histogram.Span) {
putZeroThreshold(b, zeroThreshold)
putVarbitInt(b, int64(schema))
putVarbitFloat(b, zeroThreshold)
putHistogramChunkLayoutSpans(b, positiveSpans)
putHistogramChunkLayoutSpans(b, negativeSpans)
}
func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) {
putVarbitInt(b, int64(len(spans)))
for _, s := range spans {
putVarbitInt(b, int64(s.Length))
putVarbitInt(b, int64(s.Offset))
}
}
func readHistogramChunkLayout(b *bstreamReader) (
schema int32, zeroThreshold float64,
positiveSpans, negativeSpans []histogram.Span,
err error,
) {
zeroThreshold, err = readZeroThreshold(b)
if err != nil {
return
}
v, err := readVarbitInt(b)
if err != nil {
return
}
schema = int32(v)
zeroThreshold, err = readVarbitFloat(b)
if err != nil {
return
}
positiveSpans, err = readHistogramChunkLayoutSpans(b)
if err != nil {
return
@ -61,15 +55,23 @@ func readHistogramChunkLayout(b *bstreamReader) (
return
}
func putHistogramChunkLayoutSpans(b *bstream, spans []histogram.Span) {
putVarbitUint(b, uint64(len(spans)))
for _, s := range spans {
putVarbitUint(b, uint64(s.Length))
putVarbitInt(b, int64(s.Offset))
}
}
func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
var spans []histogram.Span
num, err := readVarbitInt(b)
num, err := readVarbitUint(b)
if err != nil {
return nil, err
}
for i := 0; i < int(num); i++ {
length, err := readVarbitInt(b)
length, err := readVarbitUint(b)
if err != nil {
return nil, err
}
@ -87,6 +89,57 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
return spans, nil
}
// putZeroThreshold writes the zero threshold to the bstream. It stores typical
// values in just one byte, but needs 9 bytes for other values. In detail:
//
// * If the threshold is 0, store a single zero byte.
//
// * If the threshold is a power of 2 between (and including) 2^-243 and 2^10,
// take the exponent from the IEEE 754 representation of the threshold, which
// covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242
// in IEEE 754 representation, and 2^10 is 0.5*2^11.) Add 243 to the exponent
// and store the result (which will be between 1 and 254) as a single
// byte. Note that small powers of two are preferred values for the zero
// threshold. The default value for the zero threshold is 2^-128 (or
// 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a
// single byte (with value 116).
//
// * In all other cases, store 255 as a single byte, followed by the 8 bytes of
// the threshold as a float64, i.e. taking 9 bytes in total.
func putZeroThreshold(b *bstream, threshold float64) {
if threshold == 0 {
b.writeByte(0)
return
}
frac, exp := math.Frexp(threshold)
if frac != 0.5 || exp < -242 || exp > 11 {
b.writeByte(255)
b.writeBits(math.Float64bits(threshold), 64)
return
}
b.writeByte(byte(exp + 243))
}
// readZeroThreshold reads the zero threshold written with putZeroThreshold.
func readZeroThreshold(br *bstreamReader) (float64, error) {
b, err := br.ReadByte()
if err != nil {
return 0, err
}
switch b {
case 0:
return 0, nil
case 255:
v, err := br.readBits(64)
if err != nil {
return 0, err
}
return math.Float64frombits(v), nil
default:
return math.Ldexp(0.5, int(b-243)), nil
}
}
type bucketIterator struct {
spans []histogram.Span
span int // Span position of last yielded bucket.

View File

@ -14,46 +14,14 @@
package chunkenc
import (
"math"
"math/bits"
"github.com/pkg/errors"
)
// putVarbitFloat writes a float64 using varbit encoding. It does so by
// converting the underlying bits into an int64.
func putVarbitFloat(b *bstream, val float64) {
// TODO(beorn7): The resulting int64 here will almost never be a small
// integer. Thus, the varbit encoding doesn't really make sense
// here. This function is only used to encode the zero threshold in
// histograms. Based on that, here is an idea to improve the encoding:
//
// It is recommended to use (usually negative) powers of two as
// threshoulds. The default value for the zero threshald is in fact
// 2^-128, or 0.5*2^-127, as it is represented by IEEE 754. It is
// therefore worth a try to test if the threshold is a power of 2 and
// then just store the exponent. 0 is also a commen threshold for those
// use cases where only observations of precisely zero should go to the
// zero bucket. This results in the following proposal:
// - First we store 1 byte.
// - Iff that byte is 255 (all bits set), it is followed by a direct
// 8byte representation of the float.
// - If the byte is 0, the threshold is 0.
// - In all other cases, take the number represented by the byte,
// subtract 246, and that's the exponent (i.e. between -245 and
// +8, covering thresholds that are powers of 2 between 2^-246
// to 128).
putVarbitInt(b, int64(math.Float64bits(val)))
}
// readVarbitFloat reads a float64 encoded with putVarbitFloat
func readVarbitFloat(b *bstreamReader) (float64, error) {
val, err := readVarbitInt(b)
if err != nil {
return 0, err
}
return math.Float64frombits(uint64(val)), nil
}
// putVarbitInt writes an int64 using varbit encoding with a bit bucketing
// optimized for the dod's observed in histogram buckets.
// optimized for the dod's observed in histogram buckets, plus a few additional
// buckets for large numbers.
//
// TODO(Dieterbe): We could improve this further: Each branch doesn't need to
// support any values of any of the prior branches. So we can expand the range
@ -62,22 +30,31 @@ func readVarbitFloat(b *bstreamReader) (float64, error) {
// center-piece we skip).
func putVarbitInt(b *bstream, val int64) {
switch {
case val == 0:
case val == 0: // Precisely 0, needs 1 bit.
b.writeBit(zero)
case bitRange(val, 3): // -3 <= val <= 4
case bitRange(val, 3): // -3 <= val <= 4, needs 5 bits.
b.writeBits(0b10, 2)
b.writeBits(uint64(val), 3)
case bitRange(val, 6): // -31 <= val <= 32
case bitRange(val, 6): // -31 <= val <= 32, 9 bits.
b.writeBits(0b110, 3)
b.writeBits(uint64(val), 6)
case bitRange(val, 9): // -255 <= val <= 256
case bitRange(val, 9): // -255 <= val <= 256, 13 bits.
b.writeBits(0b1110, 4)
b.writeBits(uint64(val), 9)
case bitRange(val, 12): // -2047 <= val <= 2048
case bitRange(val, 12): // -2047 <= val <= 2048, 17 bits.
b.writeBits(0b11110, 5)
b.writeBits(uint64(val), 12)
case bitRange(val, 18): // -131071 <= val <= 131072, 3 bytes.
b.writeBits(0b111110, 6)
b.writeBits(uint64(val), 18)
case bitRange(val, 25): // -16777215 <= val <= 16777216, 4 bytes.
b.writeBits(0b1111110, 7)
b.writeBits(uint64(val), 25)
case bitRange(val, 56): // -36028797018963967 <= val <= 36028797018963968, 8 bytes.
b.writeBits(0b11111110, 8)
b.writeBits(uint64(val), 56)
default:
b.writeBits(0b11111, 5)
b.writeBits(0b11111111, 8) // Worst case, needs 9 bytes.
b.writeBits(uint64(val), 64)
}
}
@ -85,7 +62,7 @@ func putVarbitInt(b *bstream, val int64) {
// readVarbitInt reads an int64 encoced with putVarbitInt.
func readVarbitInt(b *bstreamReader) (int64, error) {
var d byte
for i := 0; i < 5; i++ {
for i := 0; i < 8; i++ {
d <<= 1
bit, err := b.readBitFast()
if err != nil {
@ -114,7 +91,13 @@ func readVarbitInt(b *bstreamReader) (int64, error) {
sz = 9
case 0b11110:
sz = 12
case 0b11111:
case 0b111110:
sz = 18
case 0b1111110:
sz = 25
case 0b11111110:
sz = 56
case 0b11111111:
// Do not use fast because it's very unlikely it will succeed.
bits, err := b.readBits(64)
if err != nil {
@ -122,6 +105,8 @@ func readVarbitInt(b *bstreamReader) (int64, error) {
}
val = int64(bits)
default:
return 0, errors.Errorf("invalid bit pattern %b", d)
}
if sz != 0 {
@ -141,3 +126,104 @@ func readVarbitInt(b *bstreamReader) (int64, error) {
return val, nil
}
func bitRangeUint(x uint64, nbits int) bool {
return bits.LeadingZeros64(x) >= 64-nbits
}
// putVarbitUint writes a uint64 using varbit encoding. It uses the same bit
// buckets as putVarbitInt.
func putVarbitUint(b *bstream, val uint64) {
switch {
case val == 0: // Precisely 0, needs 1 bit.
b.writeBit(zero)
case bitRangeUint(val, 3): // val <= 7, needs 5 bits.
b.writeBits(0b10, 2)
b.writeBits(val, 3)
case bitRangeUint(val, 6): // val <= 63, 9 bits.
b.writeBits(0b110, 3)
b.writeBits(val, 6)
case bitRangeUint(val, 9): // val <= 511, 13 bits.
b.writeBits(0b1110, 4)
b.writeBits(val, 9)
case bitRangeUint(val, 12): // val <= 4095, 17 bits.
b.writeBits(0b11110, 5)
b.writeBits(val, 12)
case bitRangeUint(val, 18): // val <= 262143, 3 bytes.
b.writeBits(0b111110, 6)
b.writeBits(val, 18)
case bitRangeUint(val, 25): // val <= 33554431, 4 bytes.
b.writeBits(0b1111110, 7)
b.writeBits(val, 25)
case bitRangeUint(val, 56): // val <= 72057594037927935, 8 bytes.
b.writeBits(0b11111110, 8)
b.writeBits(val, 56)
default:
b.writeBits(0b11111111, 8) // Worst case, needs 9 bytes.
b.writeBits(val, 64)
}
}
// readVarbitUint reads a uint64 encoced with putVarbitUint.
func readVarbitUint(b *bstreamReader) (uint64, error) {
var d byte
for i := 0; i < 8; i++ {
d <<= 1
bit, err := b.readBitFast()
if err != nil {
bit, err = b.readBit()
}
if err != nil {
return 0, err
}
if bit == zero {
break
}
d |= 1
}
var (
bits uint64
sz uint8
err error
)
switch d {
case 0b0:
// val == 0
case 0b10:
sz = 3
case 0b110:
sz = 6
case 0b1110:
sz = 9
case 0b11110:
sz = 12
case 0b111110:
sz = 18
case 0b1111110:
sz = 25
case 0b11111110:
sz = 56
case 0b11111111:
// Do not use fast because it's very unlikely it will succeed.
bits, err = b.readBits(64)
if err != nil {
return 0, err
}
default:
return 0, errors.Errorf("invalid bit pattern %b", d)
}
if sz != 0 {
bits, err = b.readBitsFast(sz)
if err != nil {
bits, err = b.readBits(sz)
}
if err != nil {
return 0, err
}
}
return bits, nil
}

View File

@ -0,0 +1,85 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkenc
import (
"math"
"testing"
"github.com/stretchr/testify/require"
)
func TestVarbitInt(t *testing.T) {
numbers := []int64{
math.MinInt64,
-36028797018963968, -36028797018963967,
-16777216, -16777215,
-131072, -131071,
-2048, -2047,
-256, -255,
-32, -31,
-4, -3,
-1, 0, 1,
4, 5,
32, 33,
256, 257,
2048, 2049,
131072, 131073,
16777216, 16777217,
36028797018963968, 36028797018963969,
math.MaxInt64,
}
bs := bstream{}
for _, n := range numbers {
putVarbitInt(&bs, n)
}
bsr := newBReader(bs.bytes())
for _, want := range numbers {
got, err := readVarbitInt(&bsr)
require.NoError(t, err)
require.Equal(t, want, got)
}
}
func TestVarbitUint(t *testing.T) {
numbers := []uint64{
0, 1,
7, 8,
63, 64,
511, 512,
4095, 4096,
262143, 262144,
33554431, 33554432,
72057594037927935, 72057594037927936,
math.MaxUint64,
}
bs := bstream{}
for _, n := range numbers {
putVarbitUint(&bs, n)
}
bsr := newBReader(bs.bytes())
for _, want := range numbers {
got, err := readVarbitUint(&bsr)
require.NoError(t, err)
require.Equal(t, want, got)
}
}

View File

@ -111,7 +111,7 @@ func (c *XORChunk) Appender() (Appender, error) {
leading: it.leading,
trailing: it.trailing,
}
if binary.BigEndian.Uint16(a.b.bytes()) == 0 {
if it.numTotal == 0 {
a.leading = 0xff
}
return a, nil

View File

@ -34,22 +34,62 @@ in-file offset (lower 4 bytes) and segment sequence number (upper 4 bytes).
└───────────────┴───────────────────┴──────────────┴────────────────┘
```
## XOR chunk
Notes:
* `<uvarint>` has 1 to 10 bytes.
* `encoding`: Currently either `XOR` or `histogram`.
* `data`: See below for each encoding.
TODO(beorn7): Add.
## Histogram chunk
TODO(beorn7): This is out of date. Update once settled on the (more or less) final format.
## XOR chunk data
```
┌──────────────┬─────────────────┬──────────────────────────┬──────────────────────────┬──────────────┐
│ len <uint16> │ schema <varint> │ pos-spans <span-section> │ neg-spans <span-section> │ data <bytes>
└──────────────┴─────────────────┴──────────────────────────┴──────────────────────────┴──────────────┘
span-section:
┌──────────────┬──────────────────┬──────────────────┬────────────┐
│ len <varint> │ length1 <varint> │ offset1 <varint> │ length2... │
└──────────────┴──────────────────┴──────────────────┴────────────┘
┌──────────────────────┬───────────────┬───────────────┬──────────────────────┬──────────────────────┬──────────────────────┬──────────────────────┬─────┐
│ num_samples <uint16> │ ts_0 <varint> │ v_0 <float64> │ ts_1_delta <uvarint> │ v_1_xor <varbit_xor> │ ts_n_dod <varbit_ts> │ v_n_xor <varbit_xor> │ ... │
└──────────────────────┴───────────────┴───────────────┴──────────────────────┴──────────────────────┴──────────────────────┴──────────────────────┴─────┘
```
### Notes:
* `ts` is the timestamp, `v` is the value.
* `...` means to repeat the previous two fields as needed, with `n` starting at 2 and going up to `num_samples` 1.
* `<uint16>` has 2 bytes in big-endian order.
* `<varint>` and `<uvarint>` have 1 to 10 bytes each.
* `ts_1_delta` is `ts_1` `ts_0`.
* `ts_n_dod` is the “delta of deltas” of timestamps, i.e. (`ts_n` `ts_n-1`) (`ts_n-1` `ts_n-2`).
* `<v_n_xor>` is the result of `v_n` XOR `v_n-1`.
* `<varbit_xor>` is a specific variable bitwidth encoding of the result of XORing the current and the previous value. It has between 1 bit and 77 bits.
See [code for details](https://github.com/prometheus/prometheus/blob/7309c20e7e5774e7838f183ec97c65baa4362edc/tsdb/chunkenc/xor.go#L220-L253).
* `<varbit_ts>` is a specific variable bitwidth encoding for the “delta of deltas” of timestamps (signed integers that are ideally small).
It has between 1 and 68 bits.
see [code for details](https://github.com/prometheus/prometheus/blob/7309c20e7e5774e7838f183ec97c65baa4362edc/tsdb/chunkenc/xor.go#L179-L205).
## Histogram chunk data
```
┌──────────────────────┬───────────────────────────────┬─────────────────────┬──────────────────┬──────────────────┬────────────────┐
│ num_samples <uint16> │ zero_threshold <1 or 9 bytes> │ schema <varbit_int> │ pos_spans <data> │ neg_spans <data> │ samples <data>
└──────────────────────┴───────────────────────────────┴─────────────────────┴──────────────────┴──────────────────┴────────────────┘
```
### Positive and negative spans data:
```
┌───────────────────┬────────────────────────┬───────────────────────┬─────┬──────────────────────────┬─────────────────────────┐
│ num <varbit_uint> │ length_1 <varbit_uint> │ offset_1 <varbit_int> │ ... │ length_num <varbit_uint> │ offset_num <varbit_int>
└───────────────────┴────────────────────────┴───────────────────────┴─────┴──────────────────────────┴─────────────────────────┘
```
### Samples data:
```
TODO
```
### Notes:
* `zero_threshold` has a specific encoding:
* If 0, it is a single zero byte.
* If a power of two between 2^-243 and 2^10, it is a single byte between 1 and 254.
* Otherwise, it is a byte with all bits set (255), followed by a float64, resulting in 9 bytes length.
* `schema` is a specific value defined by the exposition format. Currently valid values are -4 <= n <= 8.
* `<varbit_int>` is a variable bitwidth encoding for signed integers, optimized for “delta of deltas” of bucket deltas. It has between 1 bit and 9 bytes.
* `<varbit_uint>` is a variable bitwidth encoding for unsigned integers with the same bit-bucketing as `<varbit_int>`.