mirror of
https://github.com/prometheus/prometheus
synced 2025-01-20 22:50:49 +00:00
Support storing the zero threshold in the histogram chunk (#9165)
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
7026e6b4e4
commit
19e98e5469
@ -97,7 +97,7 @@ func (c *HistoChunk) NumSamples() int {
|
||||
|
||||
// Meta returns the histogram metadata.
|
||||
// callers may only call this on chunks that have at least one sample
|
||||
func (c *HistoChunk) Meta() (int32, []histogram.Span, []histogram.Span, error) {
|
||||
func (c *HistoChunk) Meta() (int32, float64, []histogram.Span, []histogram.Span, error) {
|
||||
if c.NumSamples() == 0 {
|
||||
panic("HistoChunk.Meta() called on an empty chunk")
|
||||
}
|
||||
@ -131,6 +131,7 @@ func (c *HistoChunk) Appender() (Appender, error) {
|
||||
b: &c.b,
|
||||
|
||||
schema: it.schema,
|
||||
zeroThreshold: it.zeroThreshold,
|
||||
posSpans: it.posSpans,
|
||||
negSpans: it.negSpans,
|
||||
t: it.t,
|
||||
@ -199,6 +200,7 @@ type HistoAppender struct {
|
||||
|
||||
// Metadata:
|
||||
schema int32
|
||||
zeroThreshold float64
|
||||
posSpans, negSpans []histogram.Span
|
||||
|
||||
// For the fields that are tracked as dod's. Note that we expect to
|
||||
@ -245,8 +247,7 @@ func (a *HistoAppender) Append(int64, float64) {}
|
||||
// * the zerobucket threshold has changed
|
||||
// * any buckets disappeared
|
||||
func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]Interjection, []Interjection, bool) {
|
||||
// TODO zerothreshold
|
||||
if h.Schema != a.schema {
|
||||
if h.Schema != a.schema || h.ZeroThreshold != a.zeroThreshold {
|
||||
return nil, nil, false
|
||||
}
|
||||
posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans)
|
||||
@ -273,8 +274,9 @@ func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
|
||||
// the first append gets the privilege to dictate the metadata
|
||||
// but it's also responsible for encoding it into the chunk!
|
||||
|
||||
writeHistoChunkMeta(a.b, h.Schema, h.PositiveSpans, h.NegativeSpans)
|
||||
writeHistoChunkMeta(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans)
|
||||
a.schema = h.Schema
|
||||
a.zeroThreshold = h.ZeroThreshold
|
||||
a.posSpans, a.negSpans = h.PositiveSpans, h.NegativeSpans
|
||||
numPosBuckets, numNegBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans)
|
||||
a.posbuckets = make([]int64, numPosBuckets)
|
||||
@ -442,6 +444,7 @@ type histoIterator struct {
|
||||
|
||||
// Metadata:
|
||||
schema int32
|
||||
zeroThreshold float64
|
||||
posSpans, negSpans []histogram.Span
|
||||
|
||||
// For the fields that are tracked as dod's.
|
||||
@ -486,7 +489,7 @@ func (it *histoIterator) AtHistogram() (int64, histogram.SparseHistogram) {
|
||||
Count: it.cnt,
|
||||
ZeroCount: it.zcnt,
|
||||
Sum: it.sum,
|
||||
ZeroThreshold: 0, // TODO
|
||||
ZeroThreshold: it.zeroThreshold,
|
||||
Schema: it.schema,
|
||||
PositiveSpans: it.posSpans,
|
||||
NegativeSpans: it.negSpans,
|
||||
@ -532,12 +535,13 @@ func (it *histoIterator) Next() bool {
|
||||
if it.numRead == 0 {
|
||||
|
||||
// first read is responsible for reading chunk metadata and initializing fields that depend on it
|
||||
schema, posSpans, negSpans, err := readHistoChunkMeta(&it.br)
|
||||
schema, zeroThreshold, posSpans, negSpans, err := readHistoChunkMeta(&it.br)
|
||||
if err != nil {
|
||||
it.err = err
|
||||
return false
|
||||
}
|
||||
it.schema = schema
|
||||
it.zeroThreshold = zeroThreshold
|
||||
it.posSpans, it.negSpans = posSpans, negSpans
|
||||
numPosBuckets, numNegBuckets := countSpans(posSpans), countSpans(negSpans)
|
||||
it.posbuckets = make([]int64, numPosBuckets)
|
||||
|
@ -13,10 +13,13 @@
|
||||
|
||||
package chunkenc
|
||||
|
||||
import "github.com/prometheus/prometheus/pkg/histogram"
|
||||
import (
|
||||
"github.com/prometheus/prometheus/pkg/histogram"
|
||||
)
|
||||
|
||||
func writeHistoChunkMeta(b *bstream, schema int32, posSpans, negSpans []histogram.Span) {
|
||||
func writeHistoChunkMeta(b *bstream, schema int32, zeroThreshold float64, posSpans, negSpans []histogram.Span) {
|
||||
putInt64VBBucket(b, int64(schema))
|
||||
putFloat64VBBucket(b, zeroThreshold)
|
||||
putHistoChunkMetaSpans(b, posSpans)
|
||||
putHistoChunkMetaSpans(b, negSpans)
|
||||
}
|
||||
@ -29,25 +32,29 @@ func putHistoChunkMetaSpans(b *bstream, spans []histogram.Span) {
|
||||
}
|
||||
}
|
||||
|
||||
func readHistoChunkMeta(b *bstreamReader) (int32, []histogram.Span, []histogram.Span, error) {
|
||||
|
||||
func readHistoChunkMeta(b *bstreamReader) (int32, float64, []histogram.Span, []histogram.Span, error) {
|
||||
v, err := readInt64VBBucket(b)
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
return 0, 0, nil, nil, err
|
||||
}
|
||||
schema := int32(v)
|
||||
|
||||
zeroThreshold, err := readFloat64VBBucket(b)
|
||||
if err != nil {
|
||||
return 0, 0, nil, nil, err
|
||||
}
|
||||
|
||||
posSpans, err := readHistoChunkMetaSpans(b)
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
return 0, 0, nil, nil, err
|
||||
}
|
||||
|
||||
negSpans, err := readHistoChunkMetaSpans(b)
|
||||
if err != nil {
|
||||
return 0, nil, nil, err
|
||||
return 0, 0, nil, nil, err
|
||||
}
|
||||
|
||||
return schema, posSpans, negSpans, nil
|
||||
return schema, zeroThreshold, posSpans, negSpans, nil
|
||||
}
|
||||
|
||||
func readHistoChunkMetaSpans(b *bstreamReader) ([]histogram.Span, error) {
|
||||
|
@ -31,11 +31,11 @@ func TestHistoChunkSameBuckets(t *testing.T) {
|
||||
|
||||
ts := int64(1234567890)
|
||||
h := histogram.SparseHistogram{
|
||||
Count: 5,
|
||||
ZeroCount: 2,
|
||||
Sum: 18.4,
|
||||
//ZeroThreshold: 1, TODO
|
||||
Schema: 1,
|
||||
Count: 5,
|
||||
ZeroCount: 2,
|
||||
Sum: 18.4,
|
||||
ZeroThreshold: 1e-100,
|
||||
Schema: 1,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 1, Length: 2},
|
||||
@ -129,11 +129,11 @@ func TestHistoChunkBucketChanges(t *testing.T) {
|
||||
|
||||
ts1 := int64(1234567890)
|
||||
h1 := histogram.SparseHistogram{
|
||||
Count: 5,
|
||||
ZeroCount: 2,
|
||||
Sum: 18.4,
|
||||
//ZeroThreshold: 1, TODO
|
||||
Schema: 1,
|
||||
Count: 5,
|
||||
ZeroCount: 2,
|
||||
Sum: 18.4,
|
||||
ZeroThreshold: 1e-125,
|
||||
Schema: 1,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{Offset: 0, Length: 2},
|
||||
{Offset: 2, Length: 1},
|
||||
|
@ -43,6 +43,28 @@
|
||||
|
||||
package chunkenc
|
||||
|
||||
import (
|
||||
"math"
|
||||
)
|
||||
|
||||
// putFloat64VBBucket writes a float64 using varbit optimized for SHS buckets.
|
||||
// It does so by converting the underlying bits into an int64.
|
||||
func putFloat64VBBucket(b *bstream, val float64) {
|
||||
// TODO: Since this is used for the zero threshold, this almost always goes into the default
|
||||
// bit range (i.e. using 5+64 bits). So we can consider skipping `putInt64VBBucket` and directly
|
||||
// write the float and save 5 bits here.
|
||||
putInt64VBBucket(b, int64(math.Float64bits(val)))
|
||||
}
|
||||
|
||||
// readFloat64VBBucket reads a float64 using varbit optimized for SHS buckets
|
||||
func readFloat64VBBucket(b *bstreamReader) (float64, error) {
|
||||
val, err := readInt64VBBucket(b)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return math.Float64frombits(uint64(val)), nil
|
||||
}
|
||||
|
||||
// putInt64VBBucket writes an int64 using varbit optimized for SHS buckets.
|
||||
//
|
||||
// TODO(Dieterbe): We could improve this further: Each branch doesn't need to
|
||||
@ -55,19 +77,19 @@ func putInt64VBBucket(b *bstream, val int64) {
|
||||
case val == 0:
|
||||
b.writeBit(zero)
|
||||
case bitRange(val, 3): // -3 <= val <= 4
|
||||
b.writeBits(0x02, 2) // '10'
|
||||
b.writeBits(0b10, 2)
|
||||
b.writeBits(uint64(val), 3)
|
||||
case bitRange(val, 6): // -31 <= val <= 32
|
||||
b.writeBits(0x06, 3) // '110'
|
||||
b.writeBits(0b110, 3)
|
||||
b.writeBits(uint64(val), 6)
|
||||
case bitRange(val, 9): // -255 <= val <= 256
|
||||
b.writeBits(0x0e, 4) // '1110'
|
||||
b.writeBits(0b1110, 4)
|
||||
b.writeBits(uint64(val), 9)
|
||||
case bitRange(val, 12): // -2047 <= val <= 2048
|
||||
b.writeBits(0x1e, 5) // '11110'
|
||||
b.writeBits(0b11110, 5)
|
||||
b.writeBits(uint64(val), 12)
|
||||
default:
|
||||
b.writeBits(0x3e, 5) // '11111'
|
||||
b.writeBits(0b11111, 5)
|
||||
b.writeBits(uint64(val), 64)
|
||||
}
|
||||
}
|
||||
@ -94,17 +116,17 @@ func readInt64VBBucket(b *bstreamReader) (int64, error) {
|
||||
var sz uint8
|
||||
|
||||
switch d {
|
||||
case 0x00:
|
||||
case 0b0:
|
||||
// val == 0
|
||||
case 0x02: // '10'
|
||||
case 0b10:
|
||||
sz = 3
|
||||
case 0x06: // '110'
|
||||
case 0b110:
|
||||
sz = 6
|
||||
case 0x0e: // '1110'
|
||||
case 0b1110:
|
||||
sz = 9
|
||||
case 0x1e: // '11110'
|
||||
case 0b11110:
|
||||
sz = 12
|
||||
case 0x3e: // '11111'
|
||||
case 0b11111:
|
||||
// Do not use fast because it's very unlikely it will succeed.
|
||||
bits, err := b.readBits(64)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user