enhance FloatHistogram CopyToSchema method (#12596)
histogram: Improve performance of FloatHistogram.CopyToSchema Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com> --------- Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com> Co-authored-by: Björn Rabenstein <github@rabenste.in>
This commit is contained in:
parent
b1a6d003d1
commit
ed1b307bca
|
@ -93,26 +93,8 @@ func (h *FloatHistogram) CopyToSchema(targetSchema int32) *FloatHistogram {
|
||||||
Sum: h.Sum,
|
Sum: h.Sum,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(beorn7): This is a straight-forward implementation using merging
|
c.PositiveSpans, c.PositiveBuckets = mergeToSchema(h.PositiveSpans, h.PositiveBuckets, h.Schema, targetSchema)
|
||||||
// iterators for the original buckets and then adding one merged bucket
|
c.NegativeSpans, c.NegativeBuckets = mergeToSchema(h.NegativeSpans, h.NegativeBuckets, h.Schema, targetSchema)
|
||||||
// after another to the newly created FloatHistogram. It's well possible
|
|
||||||
// that a more involved implementation performs much better, which we
|
|
||||||
// could do if this code path turns out to be performance-critical.
|
|
||||||
var iInSpan, index int32
|
|
||||||
for iSpan, iBucket, it := -1, -1, h.floatBucketIterator(true, 0, targetSchema); it.Next(); {
|
|
||||||
b := it.At()
|
|
||||||
c.PositiveSpans, c.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket(
|
|
||||||
b, c.PositiveSpans, c.PositiveBuckets, iSpan, iBucket, iInSpan, index,
|
|
||||||
)
|
|
||||||
index = b.Index
|
|
||||||
}
|
|
||||||
for iSpan, iBucket, it := -1, -1, h.floatBucketIterator(false, 0, targetSchema); it.Next(); {
|
|
||||||
b := it.At()
|
|
||||||
c.NegativeSpans, c.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket(
|
|
||||||
b, c.NegativeSpans, c.NegativeBuckets, iSpan, iBucket, iInSpan, index,
|
|
||||||
)
|
|
||||||
index = b.Index
|
|
||||||
}
|
|
||||||
|
|
||||||
return &c
|
return &c
|
||||||
}
|
}
|
||||||
|
@ -982,3 +964,72 @@ func (i *allFloatBucketIterator) Next() bool {
|
||||||
func (i *allFloatBucketIterator) At() Bucket[float64] {
|
func (i *allFloatBucketIterator) At() Bucket[float64] {
|
||||||
return i.currBucket
|
return i.currBucket
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// targetIdx returns the bucket index in the target schema for the given bucket
|
||||||
|
// index idx in the original schema.
|
||||||
|
func targetIdx(idx, originSchema, targetSchema int32) int32 {
|
||||||
|
return ((idx - 1) >> (originSchema - targetSchema)) + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// mergeToSchema is used to merge a FloatHistogram's Spans and Buckets (no matter if
|
||||||
|
// positive or negative) from the original schema to the target schema.
|
||||||
|
// The target schema must be smaller than the original schema.
|
||||||
|
func mergeToSchema(originSpans []Span, originBuckets []float64, originSchema, targetSchema int32) ([]Span, []float64) {
|
||||||
|
var (
|
||||||
|
targetSpans []Span // The spans in the target schema.
|
||||||
|
targetBuckets []float64 // The buckets in the target schema.
|
||||||
|
bucketIdx int32 // The index of bucket in the origin schema.
|
||||||
|
lastTargetBucketIdx int32 // The index of the last added target bucket.
|
||||||
|
origBucketIdx int // The position of a bucket in originBuckets slice.
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, span := range originSpans {
|
||||||
|
// Determine the index of the first bucket in this span.
|
||||||
|
bucketIdx += span.Offset
|
||||||
|
for j := 0; j < int(span.Length); j++ {
|
||||||
|
// Determine the index of the bucket in the target schema from the index in the original schema.
|
||||||
|
targetBucketIdx := targetIdx(bucketIdx, originSchema, targetSchema)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case len(targetSpans) == 0:
|
||||||
|
// This is the first span in the targetSpans.
|
||||||
|
span := Span{
|
||||||
|
Offset: targetBucketIdx,
|
||||||
|
Length: 1,
|
||||||
|
}
|
||||||
|
targetSpans = append(targetSpans, span)
|
||||||
|
targetBuckets = append(targetBuckets, originBuckets[0])
|
||||||
|
lastTargetBucketIdx = targetBucketIdx
|
||||||
|
|
||||||
|
case lastTargetBucketIdx == targetBucketIdx:
|
||||||
|
// The current bucket has to be merged into the same target bucket as the previous bucket.
|
||||||
|
targetBuckets[len(targetBuckets)-1] += originBuckets[origBucketIdx]
|
||||||
|
|
||||||
|
case (lastTargetBucketIdx + 1) == targetBucketIdx:
|
||||||
|
// The current bucket has to go into a new target bucket,
|
||||||
|
// and that bucket is next to the previous target bucket,
|
||||||
|
// so we add it to the current target span.
|
||||||
|
targetSpans[len(targetSpans)-1].Length++
|
||||||
|
targetBuckets = append(targetBuckets, originBuckets[origBucketIdx])
|
||||||
|
lastTargetBucketIdx++
|
||||||
|
|
||||||
|
case (lastTargetBucketIdx + 1) < targetBucketIdx:
|
||||||
|
// The current bucket has to go into a new target bucket,
|
||||||
|
// and that bucket is separated by a gap from the previous target bucket,
|
||||||
|
// so we need to add a new target span.
|
||||||
|
span := Span{
|
||||||
|
Offset: targetBucketIdx - lastTargetBucketIdx - 1,
|
||||||
|
Length: 1,
|
||||||
|
}
|
||||||
|
targetSpans = append(targetSpans, span)
|
||||||
|
targetBuckets = append(targetBuckets, originBuckets[origBucketIdx])
|
||||||
|
lastTargetBucketIdx = targetBucketIdx
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketIdx++
|
||||||
|
origBucketIdx++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return targetSpans, targetBuckets
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue