[Benchmark] TSDB: Add BenchmarkQuerierSelectWithOutOfOrder
Refactor existing BenchmarkQuerierSelect to provide the set-up. Note that Head queries now run faster because they use a RangeHead. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
parent
ff0a1e5e11
commit
0c852680bf
|
@ -467,6 +467,11 @@ func (pb *Block) setCompactionFailed() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Querier implements Queryable.
|
||||||
|
func (pb *Block) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||||
|
return NewBlockQuerier(pb, mint, maxt)
|
||||||
|
}
|
||||||
|
|
||||||
type blockIndexReader struct {
|
type blockIndexReader struct {
|
||||||
ir IndexReader
|
ir IndexReader
|
||||||
b *Block
|
b *Block
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb/index"
|
"github.com/prometheus/prometheus/tsdb/index"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -254,56 +255,91 @@ func BenchmarkMergedStringIter(b *testing.B) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkQuerierSelect(b *testing.B) {
|
func createHeadForBenchmarkSelect(b *testing.B, numSeries int, addSeries func(app storage.Appender, i int)) (*Head, *DB) {
|
||||||
opts := DefaultHeadOptions()
|
dir := b.TempDir()
|
||||||
opts.ChunkRange = 1000
|
opts := DefaultOptions()
|
||||||
opts.ChunkDirRoot = b.TempDir()
|
opts.OutOfOrderCapMax = 255
|
||||||
h, err := NewHead(nil, nil, nil, nil, opts, nil)
|
opts.OutOfOrderTimeWindow = 1000
|
||||||
|
db, err := Open(dir, nil, nil, opts, nil)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer h.Close()
|
b.Cleanup(func() {
|
||||||
|
require.NoError(b, db.Close())
|
||||||
|
})
|
||||||
|
h := db.Head()
|
||||||
|
|
||||||
app := h.Appender(context.Background())
|
app := h.Appender(context.Background())
|
||||||
numSeries := 1000000
|
|
||||||
for i := 0; i < numSeries; i++ {
|
for i := 0; i < numSeries; i++ {
|
||||||
app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0)
|
addSeries(app, i)
|
||||||
}
|
}
|
||||||
require.NoError(b, app.Commit())
|
require.NoError(b, app.Commit())
|
||||||
|
return h, db
|
||||||
|
}
|
||||||
|
|
||||||
bench := func(b *testing.B, br BlockReader, sorted bool) {
|
func benchmarkSelect(b *testing.B, queryable storage.Queryable, numSeries int, sorted bool) {
|
||||||
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
|
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
|
||||||
for s := 1; s <= numSeries; s *= 10 {
|
b.ResetTimer()
|
||||||
b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) {
|
for s := 1; s <= numSeries; s *= 10 {
|
||||||
q, err := NewBlockQuerier(br, 0, int64(s-1))
|
b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) {
|
||||||
require.NoError(b, err)
|
q, err := queryable.Querier(0, int64(s-1))
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
ss := q.Select(context.Background(), sorted, nil, matcher)
|
ss := q.Select(context.Background(), sorted, nil, matcher)
|
||||||
for ss.Next() {
|
for ss.Next() {
|
||||||
}
|
|
||||||
require.NoError(b, ss.Err())
|
|
||||||
}
|
}
|
||||||
q.Close()
|
require.NoError(b, ss.Err())
|
||||||
})
|
}
|
||||||
}
|
q.Close()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkQuerierSelect(b *testing.B) {
|
||||||
|
numSeries := 1000000
|
||||||
|
h, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) {
|
||||||
|
_, err := app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
b.Run("Head", func(b *testing.B) {
|
b.Run("Head", func(b *testing.B) {
|
||||||
bench(b, h, false)
|
benchmarkSelect(b, db, numSeries, false)
|
||||||
})
|
})
|
||||||
b.Run("SortedHead", func(b *testing.B) {
|
b.Run("SortedHead", func(b *testing.B) {
|
||||||
bench(b, h, true)
|
benchmarkSelect(b, db, numSeries, true)
|
||||||
})
|
})
|
||||||
|
|
||||||
tmpdir := b.TempDir()
|
|
||||||
|
|
||||||
blockdir := createBlockFromHead(b, tmpdir, h)
|
|
||||||
block, err := OpenBlock(nil, blockdir, nil)
|
|
||||||
require.NoError(b, err)
|
|
||||||
defer func() {
|
|
||||||
require.NoError(b, block.Close())
|
|
||||||
}()
|
|
||||||
|
|
||||||
b.Run("Block", func(b *testing.B) {
|
b.Run("Block", func(b *testing.B) {
|
||||||
bench(b, block, false)
|
tmpdir := b.TempDir()
|
||||||
|
|
||||||
|
blockdir := createBlockFromHead(b, tmpdir, h)
|
||||||
|
block, err := OpenBlock(nil, blockdir, nil)
|
||||||
|
require.NoError(b, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(b, block.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
benchmarkSelect(b, block, numSeries, false)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkQuerierSelectWithOutOfOrder(b *testing.B) {
|
||||||
|
numSeries := 1000000
|
||||||
|
_, db := createHeadForBenchmarkSelect(b, numSeries, func(app storage.Appender, i int) {
|
||||||
|
l := labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix))
|
||||||
|
ref, err := app.Append(0, l, int64(i+1), 0)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = app.Append(ref, l, int64(i), 1) // Out of order sample
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
b.Run("Head", func(b *testing.B) {
|
||||||
|
benchmarkSelect(b, db, numSeries, false)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue