Compare block sizes with sparse histograms (#9045)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-07-08 11:01:53 +05:30 committed by GitHub
parent 08258cc539
commit 79305e704b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 313 additions and 0 deletions

View File

@ -18,9 +18,11 @@ import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path"
"path/filepath"
"sync"
"testing"
"time"
@ -1377,3 +1379,314 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
require.Equal(t, expHists, actHists)
}
// Depending on numSeriesPerSchema, it can take few gigs of memory;
// the test adds all samples to appender before committing instead of
// buffering the writes to make it run faster.
func TestSparseHistoSpaceSavings(t *testing.T) {
t.Skip()
cases := []struct {
numSeriesPerSchema int
numBuckets int
numSpans int
gapBetweenSpans int
}{
{1, 15, 1, 0},
{1, 50, 1, 0},
{1, 100, 1, 0},
{1, 15, 3, 5},
{1, 50, 3, 3},
{1, 100, 3, 2},
{100, 15, 1, 0},
{100, 50, 1, 0},
{100, 100, 1, 0},
{100, 15, 3, 5},
{100, 50, 3, 3},
{100, 100, 3, 2},
//{1000, 15, 1, 0},
//{1000, 50, 1, 0},
//{1000, 100, 1, 0},
//{1000, 15, 3, 5},
//{1000, 50, 3, 3},
//{1000, 100, 3, 2},
}
type testSummary struct {
oldBlockTotalSeries int
oldBlockIndexSize int64
oldBlockChunksSize int64
oldBlockTotalSize int64
sparseBlockTotalSeries int
sparseBlockIndexSize int64
sparseBlockChunksSize int64
sparseBlockTotalSize int64
numBuckets int
numSpans int
gapBetweenSpans int
}
var summaries []testSummary
allSchemas := []int{-4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8}
schemaDescription := []string{"minus_4", "minus_3", "minus_2", "minus_1", "0", "1", "2", "3", "4", "5", "6", "7", "8"}
numHistograms := 120 * 4 // 15s scrape interval.
timeStep := DefaultBlockDuration / int64(numHistograms)
for _, c := range cases {
t.Run(
fmt.Sprintf("series=%d,span=%d,gap=%d,buckets=%d",
len(allSchemas)*c.numSeriesPerSchema,
c.numSpans,
c.gapBetweenSpans,
c.numBuckets,
),
func(t *testing.T) {
oldHead, _ := newTestHead(t, DefaultBlockDuration, false)
t.Cleanup(func() {
require.NoError(t, oldHead.Close())
})
sparseHead, _ := newTestHead(t, DefaultBlockDuration, false)
t.Cleanup(func() {
require.NoError(t, sparseHead.Close())
})
var allSparseSeries []struct {
baseLabels labels.Labels
hists []histogram.SparseHistogram
}
for sid, schema := range allSchemas {
for i := 0; i < c.numSeriesPerSchema; i++ {
lbls := labels.Labels{
{Name: "__name__", Value: fmt.Sprintf("rpc_durations_%d_histogram_seconds", i)},
{Name: "instance", Value: "localhost:8080"},
{Name: "job", Value: fmt.Sprintf("sparse_histogram_schema_%s", schemaDescription[sid])},
}
allSparseSeries = append(allSparseSeries, struct {
baseLabels labels.Labels
hists []histogram.SparseHistogram
}{baseLabels: lbls, hists: generateCustomHistograms(numHistograms, c.numBuckets, c.numSpans, c.gapBetweenSpans, schema)})
}
}
oldApp := oldHead.Appender(context.Background())
sparseApp := sparseHead.Appender(context.Background())
numOldSeriesPerHistogram := 0
var oldULID ulid.ULID
var sparseULID ulid.ULID
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// Ingest sparse histograms.
for _, ah := range allSparseSeries {
var (
ref uint64
err error
)
for i := 0; i < numHistograms; i++ {
ts := int64(i) * timeStep
ref, err = sparseApp.AppendHistogram(ref, ah.baseLabels, ts, ah.hists[i])
require.NoError(t, err)
}
}
require.NoError(t, sparseApp.Commit())
// Sparse head compaction.
mint := sparseHead.MinTime()
maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err)
sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil)
require.NoError(t, err)
require.NotEqual(t, ulid.ULID{}, sparseULID)
}()
wg.Add(1)
go func() {
defer wg.Done()
// Ingest histograms the old way.
for _, ah := range allSparseSeries {
refs := make([]uint64, c.numBuckets+((c.numSpans-1)*c.gapBetweenSpans))
for i := 0; i < numHistograms; i++ {
ts := int64(i) * timeStep
h := ah.hists[i]
numOldSeriesPerHistogram = 0
it := histogram.CumulativeExpandSparseHistogram(h)
itIdx := 0
var err error
for it.Next() {
numOldSeriesPerHistogram++
b := it.At()
lbls := append(ah.baseLabels, labels.Label{Name: "le", Value: fmt.Sprintf("%.16f", b.Le)})
refs[itIdx], err = oldApp.Append(refs[itIdx], lbls, ts, float64(b.Count))
require.NoError(t, err)
itIdx++
}
require.NoError(t, it.Err())
// _count metric.
countLbls := ah.baseLabels.Copy()
countLbls[0].Value = countLbls[0].Value + "_count"
_, err = oldApp.Append(0, countLbls, ts, float64(h.Count))
require.NoError(t, err)
numOldSeriesPerHistogram++
// _sum metric.
sumLbls := ah.baseLabels.Copy()
sumLbls[0].Value = sumLbls[0].Value + "_sum"
_, err = oldApp.Append(0, sumLbls, ts, h.Sum)
require.NoError(t, err)
numOldSeriesPerHistogram++
}
}
require.NoError(t, oldApp.Commit())
// Old head compaction.
mint := oldHead.MinTime()
maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err)
oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil)
require.NoError(t, err)
require.NotEqual(t, ulid.ULID{}, oldULID)
}()
wg.Wait()
oldBlockDir := filepath.Join(oldHead.opts.ChunkDirRoot, oldULID.String())
sparseBlockDir := filepath.Join(sparseHead.opts.ChunkDirRoot, sparseULID.String())
oldSize, err := fileutil.DirSize(oldBlockDir)
require.NoError(t, err)
oldIndexSize, err := fileutil.DirSize(filepath.Join(oldBlockDir, "index"))
require.NoError(t, err)
oldChunksSize, err := fileutil.DirSize(filepath.Join(oldBlockDir, "chunks"))
require.NoError(t, err)
sparseSize, err := fileutil.DirSize(sparseBlockDir)
require.NoError(t, err)
sparseIndexSize, err := fileutil.DirSize(filepath.Join(sparseBlockDir, "index"))
require.NoError(t, err)
sparseChunksSize, err := fileutil.DirSize(filepath.Join(sparseBlockDir, "chunks"))
require.NoError(t, err)
summaries = append(summaries, testSummary{
oldBlockTotalSeries: len(allSchemas) * c.numSeriesPerSchema * numOldSeriesPerHistogram,
oldBlockIndexSize: oldIndexSize,
oldBlockChunksSize: oldChunksSize,
oldBlockTotalSize: oldSize,
sparseBlockTotalSeries: len(allSchemas) * c.numSeriesPerSchema,
sparseBlockIndexSize: sparseIndexSize,
sparseBlockChunksSize: sparseChunksSize,
sparseBlockTotalSize: sparseSize,
numBuckets: c.numBuckets,
numSpans: c.numSpans,
gapBetweenSpans: c.gapBetweenSpans,
})
})
}
for _, s := range summaries {
fmt.Printf(`
Meta: NumBuckets=%d, NumSpans=%d, GapBetweenSpans=%d
Old Block: NumSeries=%d, IndexSize=%d, ChunksSize=%d, TotalSize=%d
Sparse Block: NumSeries=%d, IndexSize=%d, ChunksSize=%d, TotalSize=%d
Savings: Index=%.2f%%, Chunks=%.2f%%, Total=%.2f%%
`,
s.numBuckets, s.numSpans, s.gapBetweenSpans,
s.oldBlockTotalSeries, s.oldBlockIndexSize, s.oldBlockChunksSize, s.oldBlockTotalSize,
s.sparseBlockTotalSeries, s.sparseBlockIndexSize, s.sparseBlockChunksSize, s.sparseBlockTotalSize,
100*(1-float64(s.sparseBlockIndexSize)/float64(s.oldBlockIndexSize)),
100*(1-float64(s.sparseBlockChunksSize)/float64(s.oldBlockChunksSize)),
100*(1-float64(s.sparseBlockTotalSize)/float64(s.oldBlockTotalSize)),
)
}
}
func generateCustomHistograms(numHists, numBuckets, numSpans, gapBetweenSpans, schema int) (r []histogram.SparseHistogram) {
// First histogram with all the settings.
h := histogram.SparseHistogram{
Sum: 1000 * rand.Float64(),
Schema: int32(schema),
}
// Generate spans.
h.PositiveSpans = []histogram.Span{
{Offset: int32(rand.Intn(10)), Length: uint32(numBuckets)},
}
if numSpans > 1 {
spanWidth := numBuckets / numSpans
// First span gets those additional buckets.
h.PositiveSpans[0].Length = uint32(spanWidth + (numBuckets - spanWidth*numSpans))
for i := 0; i < numSpans-1; i++ {
h.PositiveSpans = append(h.PositiveSpans, histogram.Span{Offset: int32(rand.Intn(gapBetweenSpans) + 1), Length: uint32(spanWidth)})
}
}
// Generate buckets.
v := int64(rand.Intn(30) + 1)
h.PositiveBuckets = []int64{v}
count := v
firstHistValues := []int64{v}
for i := 0; i < numBuckets-1; i++ {
delta := int64(rand.Intn(20))
if rand.Int()%2 == 0 && firstHistValues[len(firstHistValues)-1] > delta {
// Randomly making delta negative such that curr value will be >0.
delta = -delta
}
currVal := firstHistValues[len(firstHistValues)-1] + delta
count += currVal
firstHistValues = append(firstHistValues, currVal)
h.PositiveBuckets = append(h.PositiveBuckets, delta)
}
h.Count = uint64(count)
r = append(r, h)
// Remaining histograms with same spans but changed bucket values.
for j := 0; j < numHists-1; j++ {
newH := h.Copy()
newH.Sum = float64(j+1) * 1000 * rand.Float64()
// Generate buckets.
count := int64(0)
currVal := int64(0)
for i := range newH.PositiveBuckets {
delta := int64(rand.Intn(10))
if i == 0 {
newH.PositiveBuckets[i] += delta
currVal = newH.PositiveBuckets[i]
continue
}
currVal += newH.PositiveBuckets[i]
if rand.Int()%2 == 0 && (currVal-delta) > firstHistValues[i] {
// Randomly making delta negative such that curr value will be >0
// and above the previous count since we are not doing resets here.
delta = -delta
}
newH.PositiveBuckets[i] += delta
currVal += delta
count += currVal
}
newH.Count = uint64(count)
r = append(r, newH)
h = newH
}
return r
}