mirror of
https://github.com/prometheus/prometheus
synced 2024-12-26 00:23:18 +00:00
Make head Postings only return series in time range
Series() will fetch all the metadata for a series, even if it's going to be filtered later due to time ranges. For 1M series we save ~1.1s if you only needed some of the data, but take an extra ~.2s if you did want everything. benchmark old ns/op new ns/op delta BenchmarkHeadSeries/1of1000000-4 1443715987 131553480 -90.89% BenchmarkHeadSeries/10of1000000-4 1433394040 130730596 -90.88% BenchmarkHeadSeries/100of1000000-4 1437444672 131360813 -90.86% BenchmarkHeadSeries/1000of1000000-4 1438958659 132573137 -90.79% BenchmarkHeadSeries/10000of1000000-4 1438061766 145742377 -89.87% BenchmarkHeadSeries/100000of1000000-4 1455060948 281659416 -80.64% BenchmarkHeadSeries/1000000of1000000-4 1633524504 1803550153 +10.41% benchmark old allocs new allocs delta BenchmarkHeadSeries/1of1000000-4 4000055 28 -100.00% BenchmarkHeadSeries/10of1000000-4 4000073 87 -100.00% BenchmarkHeadSeries/100of1000000-4 4000253 630 -99.98% BenchmarkHeadSeries/1000of1000000-4 4002053 6036 -99.85% BenchmarkHeadSeries/10000of1000000-4 4020053 60054 -98.51% BenchmarkHeadSeries/100000of1000000-4 4200053 600074 -85.71% BenchmarkHeadSeries/1000000of1000000-4 6000053 6000094 +0.00% benchmark old bytes new bytes delta BenchmarkHeadSeries/1of1000000-4 229192184 2488 -100.00% BenchmarkHeadSeries/10of1000000-4 229193336 5568 -100.00% BenchmarkHeadSeries/100of1000000-4 229204856 35536 -99.98% BenchmarkHeadSeries/1000of1000000-4 229320056 345104 -99.85% BenchmarkHeadSeries/10000of1000000-4 230472056 3894673 -98.31% BenchmarkHeadSeries/100000of1000000-4 241992056 40511632 -83.26% BenchmarkHeadSeries/1000000of1000000-4 357192056 402380440 +12.65% Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
0e51cf65e7
commit
cebe36c7d5
@ -112,7 +112,7 @@ type ChunkReader interface {
|
|||||||
// BlockReader provides reading access to a data block.
|
// BlockReader provides reading access to a data block.
|
||||||
type BlockReader interface {
|
type BlockReader interface {
|
||||||
// Index returns an IndexReader over the block's data.
|
// Index returns an IndexReader over the block's data.
|
||||||
Index() (IndexReader, error)
|
Index(mint, maxt int64) (IndexReader, error)
|
||||||
|
|
||||||
// Chunks returns a ChunkReader over the block's data.
|
// Chunks returns a ChunkReader over the block's data.
|
||||||
Chunks() (ChunkReader, error)
|
Chunks() (ChunkReader, error)
|
||||||
@ -372,7 +372,7 @@ func (pb *Block) startRead() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Index returns a new IndexReader against the block data.
|
// Index returns a new IndexReader against the block data.
|
||||||
func (pb *Block) Index() (IndexReader, error) {
|
func (pb *Block) Index(mint, maxt int64) (IndexReader, error) {
|
||||||
if err := pb.startRead(); err != nil {
|
if err := pb.startRead(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -683,7 +683,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
indexr, err := b.Index()
|
indexr, err := b.Index(math.MinInt64, math.MaxInt64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "open index reader for block %s", b)
|
return errors.Wrapf(err, "open index reader for block %s", b)
|
||||||
}
|
}
|
||||||
|
26
tsdb/head.go
26
tsdb/head.go
@ -763,7 +763,7 @@ func NewRangeHead(head *Head, mint, maxt int64) *RangeHead {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *RangeHead) Index() (IndexReader, error) {
|
func (h *RangeHead) Index(mint, maxt int64) (IndexReader, error) {
|
||||||
return h.head.indexRange(h.mint, h.maxt), nil
|
return h.head.indexRange(h.mint, h.maxt), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1162,8 +1162,8 @@ func (h *Head) Tombstones() (tombstones.Reader, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Index returns an IndexReader against the block.
|
// Index returns an IndexReader against the block.
|
||||||
func (h *Head) Index() (IndexReader, error) {
|
func (h *Head) Index(mint, maxt int64) (IndexReader, error) {
|
||||||
return h.indexRange(math.MinInt64, math.MaxInt64), nil
|
return h.indexRange(mint, maxt), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
|
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
|
||||||
@ -1349,7 +1349,25 @@ func (h *headIndexReader) LabelNames() ([]string, error) {
|
|||||||
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
|
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
|
||||||
res := make([]index.Postings, 0, len(values))
|
res := make([]index.Postings, 0, len(values))
|
||||||
for _, value := range values {
|
for _, value := range values {
|
||||||
res = append(res, h.head.postings.Get(name, value))
|
p := h.head.postings.Get(name, value)
|
||||||
|
// Filter out series not in the time range, to avoid
|
||||||
|
// later on building up all the chunk metadata just to
|
||||||
|
// discard it.
|
||||||
|
filtered := []uint64{}
|
||||||
|
for p.Next() {
|
||||||
|
s := h.head.series.getByID(p.At())
|
||||||
|
if s == nil {
|
||||||
|
level.Debug(h.head.logger).Log("msg", "looked up series not found")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if s.minTime() <= h.maxt && s.maxTime() >= h.mint {
|
||||||
|
filtered = append(filtered, p.At())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if p.Err() != nil {
|
||||||
|
return nil, p.Err()
|
||||||
|
}
|
||||||
|
res = append(res, index.NewListPostings(filtered))
|
||||||
}
|
}
|
||||||
return index.Merge(res...), nil
|
return index.Merge(res...), nil
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
@ -48,3 +49,35 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkHeadSeries(b *testing.B) {
|
||||||
|
h, err := NewHead(nil, nil, nil, 1000)
|
||||||
|
testutil.Ok(b, err)
|
||||||
|
defer h.Close()
|
||||||
|
app := h.Appender()
|
||||||
|
numSeries := 1000000
|
||||||
|
for i := 0; i < numSeries; i++ {
|
||||||
|
app.Add(labels.FromStrings("foo", "bar", "i", strconv.Itoa(i)), int64(i), 0)
|
||||||
|
}
|
||||||
|
testutil.Ok(b, app.Commit())
|
||||||
|
|
||||||
|
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
|
||||||
|
|
||||||
|
for s := 1; s <= numSeries; s *= 10 {
|
||||||
|
b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) {
|
||||||
|
q, err := NewBlockQuerier(h, 0, int64(s-1))
|
||||||
|
testutil.Ok(b, err)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ss, err := q.Select(matcher)
|
||||||
|
testutil.Ok(b, err)
|
||||||
|
for ss.Next() {
|
||||||
|
}
|
||||||
|
testutil.Ok(b, ss.Err())
|
||||||
|
}
|
||||||
|
q.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -160,7 +160,7 @@ func (q *verticalQuerier) sel(p *storage.SelectParams, qs []storage.Querier, ms
|
|||||||
|
|
||||||
// NewBlockQuerier returns a querier against the reader.
|
// NewBlockQuerier returns a querier against the reader.
|
||||||
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
||||||
indexr, err := b.Index()
|
indexr, err := b.Index(mint, maxt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "open index reader")
|
return nil, errors.Wrapf(err, "open index reader")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user