mirror of
https://github.com/prometheus/prometheus
synced 2024-12-24 23:42:32 +00:00
Merge pull request #7051 from roidelapluie/revertopt
Revert head posting optimization
This commit is contained in:
commit
635fff9ea4
@ -111,9 +111,8 @@ type ChunkReader interface {
|
||||
|
||||
// BlockReader provides reading access to a data block.
|
||||
type BlockReader interface {
|
||||
// Index returns an IndexReader over the block's data within the specified
|
||||
// timeframe.
|
||||
Index(mint, maxt int64) (IndexReader, error)
|
||||
// Index returns an IndexReader over the block's data.
|
||||
Index() (IndexReader, error)
|
||||
|
||||
// Chunks returns a ChunkReader over the block's data.
|
||||
Chunks() (ChunkReader, error)
|
||||
@ -373,7 +372,7 @@ func (pb *Block) startRead() error {
|
||||
}
|
||||
|
||||
// Index returns a new IndexReader against the block data.
|
||||
func (pb *Block) Index(mint, maxt int64) (IndexReader, error) {
|
||||
func (pb *Block) Index() (IndexReader, error) {
|
||||
if err := pb.startRead(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -471,7 +471,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error {
|
||||
// Presume 1ms resolution that Prometheus uses.
|
||||
fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
|
||||
fmt.Printf("Series: %d\n", meta.Stats.NumSeries)
|
||||
ir, err := b.Index(math.MinInt64, math.MaxInt64)
|
||||
ir, err := b.Index()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -681,7 +681,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||
}
|
||||
}
|
||||
|
||||
indexr, err := b.Index(math.MinInt64, globalMaxt)
|
||||
indexr, err := b.Index()
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "open index reader for block %s", b)
|
||||
}
|
||||
|
@ -456,10 +456,10 @@ func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta {
|
||||
|
||||
type erringBReader struct{}
|
||||
|
||||
func (erringBReader) Index(int64, int64) (IndexReader, error) { return nil, errors.New("index") }
|
||||
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
|
||||
func (erringBReader) Tombstones() (tombstones.Reader, error) { return nil, errors.New("tombstones") }
|
||||
func (erringBReader) Meta() BlockMeta { return BlockMeta{} }
|
||||
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
|
||||
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
|
||||
func (erringBReader) Tombstones() (tombstones.Reader, error) { return nil, errors.New("tombstones") }
|
||||
func (erringBReader) Meta() BlockMeta { return BlockMeta{} }
|
||||
|
||||
type nopChunkWriter struct{}
|
||||
|
||||
|
@ -1437,7 +1437,7 @@ func TestChunkAtBlockBoundary(t *testing.T) {
|
||||
testutil.Ok(t, err)
|
||||
|
||||
for _, block := range db.Blocks() {
|
||||
r, err := block.Index(math.MinInt64, math.MaxInt64)
|
||||
r, err := block.Index()
|
||||
testutil.Ok(t, err)
|
||||
defer r.Close()
|
||||
|
||||
@ -1787,7 +1787,7 @@ func TestDB_LabelNames(t *testing.T) {
|
||||
appendSamples(db, 0, 4, tst.sampleLabels1)
|
||||
|
||||
// Testing head.
|
||||
headIndexr, err := db.head.Index(math.MinInt64, math.MaxInt64)
|
||||
headIndexr, err := db.head.Index()
|
||||
testutil.Ok(t, err)
|
||||
labelNames, err := headIndexr.LabelNames()
|
||||
testutil.Ok(t, err)
|
||||
@ -1800,7 +1800,7 @@ func TestDB_LabelNames(t *testing.T) {
|
||||
// All blocks have same label names, hence check them individually.
|
||||
// No need to aggregate and check.
|
||||
for _, b := range db.Blocks() {
|
||||
blockIndexr, err := b.Index(math.MinInt64, math.MaxInt64)
|
||||
blockIndexr, err := b.Index()
|
||||
testutil.Ok(t, err)
|
||||
labelNames, err = blockIndexr.LabelNames()
|
||||
testutil.Ok(t, err)
|
||||
|
45
tsdb/head.go
45
tsdb/head.go
@ -763,15 +763,8 @@ func NewRangeHead(head *Head, mint, maxt int64) *RangeHead {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *RangeHead) Index(mint, maxt int64) (IndexReader, error) {
|
||||
// rangeHead guarantees that the series returned are within its range.
|
||||
if mint < h.mint {
|
||||
mint = h.mint
|
||||
}
|
||||
if maxt > h.maxt {
|
||||
maxt = h.maxt
|
||||
}
|
||||
return h.head.indexRange(mint, maxt), nil
|
||||
func (h *RangeHead) Index() (IndexReader, error) {
|
||||
return h.head.indexRange(h.mint, h.maxt), nil
|
||||
}
|
||||
|
||||
func (h *RangeHead) Chunks() (ChunkReader, error) {
|
||||
@ -1196,8 +1189,8 @@ func (h *Head) Tombstones() (tombstones.Reader, error) {
|
||||
}
|
||||
|
||||
// Index returns an IndexReader against the block.
|
||||
func (h *Head) Index(mint, maxt int64) (IndexReader, error) {
|
||||
return h.indexRange(mint, maxt), nil
|
||||
func (h *Head) Index() (IndexReader, error) {
|
||||
return h.indexRange(math.MinInt64, math.MaxInt64), nil
|
||||
}
|
||||
|
||||
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
|
||||
@ -1390,37 +1383,9 @@ func (h *headIndexReader) LabelNames() ([]string, error) {
|
||||
|
||||
// Postings returns the postings list iterator for the label pairs.
|
||||
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
|
||||
fullRange := h.mint <= h.head.MinTime() && h.maxt >= h.head.MaxTime()
|
||||
res := make([]index.Postings, 0, len(values))
|
||||
for _, value := range values {
|
||||
p := h.head.postings.Get(name, value)
|
||||
if fullRange {
|
||||
// The head timerange covers the full index reader timerange.
|
||||
// All the series can the be appended without filtering.
|
||||
res = append(res, p)
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter out series not in the time range, to avoid
|
||||
// later on building up all the chunk metadata just to
|
||||
// discard it.
|
||||
filtered := []uint64{}
|
||||
for p.Next() {
|
||||
s := h.head.series.getByID(p.At())
|
||||
if s == nil {
|
||||
level.Debug(h.head.logger).Log("msg", "looked up series not found")
|
||||
continue
|
||||
}
|
||||
s.RLock()
|
||||
if s.minTime() <= h.maxt && s.maxTime() >= h.mint {
|
||||
filtered = append(filtered, p.At())
|
||||
}
|
||||
s.RUnlock()
|
||||
}
|
||||
if p.Err() != nil {
|
||||
return nil, p.Err()
|
||||
}
|
||||
res = append(res, index.NewListPostings(filtered))
|
||||
res = append(res, h.head.postings.Get(name, value))
|
||||
}
|
||||
return index.Merge(res...), nil
|
||||
}
|
||||
|
@ -1333,94 +1333,6 @@ func TestAddDuplicateLabelName(t *testing.T) {
|
||||
add(labels.Labels{{Name: "__name__", Value: "up"}, {Name: "job", Value: "prometheus"}, {Name: "le", Value: "500"}, {Name: "le", Value: "400"}, {Name: "unit", Value: "s"}}, "le")
|
||||
}
|
||||
|
||||
func TestHeadSeriesWithTimeBoundaries(t *testing.T) {
|
||||
h, err := NewHead(nil, nil, nil, 15, DefaultStripeSize)
|
||||
testutil.Ok(t, err)
|
||||
defer h.Close()
|
||||
testutil.Ok(t, h.Init(0))
|
||||
app := h.Appender()
|
||||
|
||||
s1, err := app.Add(labels.FromStrings("foo1", "bar"), 2, 0)
|
||||
testutil.Ok(t, err)
|
||||
for ts := int64(3); ts < 13; ts++ {
|
||||
err = app.AddFast(s1, ts, 0)
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0)
|
||||
testutil.Ok(t, err)
|
||||
for ts := int64(6); ts < 11; ts++ {
|
||||
err = app.AddFast(s2, ts, 0)
|
||||
testutil.Ok(t, err)
|
||||
}
|
||||
s3, err := app.Add(labels.FromStrings("foo3", "bar"), 5, 0)
|
||||
testutil.Ok(t, err)
|
||||
err = app.AddFast(s3, 6, 0)
|
||||
testutil.Ok(t, err)
|
||||
_, err = app.Add(labels.FromStrings("foo4", "bar"), 9, 0)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
testutil.Ok(t, app.Commit())
|
||||
|
||||
cases := []struct {
|
||||
mint int64
|
||||
maxt int64
|
||||
seriesCount int
|
||||
samplesCount int
|
||||
}{
|
||||
// foo1 ..00000000000..
|
||||
// foo2 .....000000....
|
||||
// foo3 .....00........
|
||||
// foo4 .........0.....
|
||||
{mint: 0, maxt: 0, seriesCount: 0, samplesCount: 0},
|
||||
{mint: 0, maxt: 1, seriesCount: 0, samplesCount: 0},
|
||||
{mint: 0, maxt: 2, seriesCount: 1, samplesCount: 1},
|
||||
{mint: 2, maxt: 2, seriesCount: 1, samplesCount: 1},
|
||||
{mint: 0, maxt: 4, seriesCount: 1, samplesCount: 3},
|
||||
{mint: 0, maxt: 5, seriesCount: 3, samplesCount: 6},
|
||||
{mint: 0, maxt: 6, seriesCount: 3, samplesCount: 9},
|
||||
{mint: 0, maxt: 7, seriesCount: 3, samplesCount: 11},
|
||||
{mint: 0, maxt: 8, seriesCount: 3, samplesCount: 13},
|
||||
{mint: 0, maxt: 9, seriesCount: 4, samplesCount: 16},
|
||||
{mint: 0, maxt: 10, seriesCount: 4, samplesCount: 18},
|
||||
{mint: 0, maxt: 11, seriesCount: 4, samplesCount: 19},
|
||||
{mint: 0, maxt: 12, seriesCount: 4, samplesCount: 20},
|
||||
{mint: 0, maxt: 13, seriesCount: 4, samplesCount: 20},
|
||||
{mint: 0, maxt: 14, seriesCount: 4, samplesCount: 20},
|
||||
{mint: 2, maxt: 14, seriesCount: 4, samplesCount: 20},
|
||||
{mint: 3, maxt: 14, seriesCount: 4, samplesCount: 19},
|
||||
{mint: 4, maxt: 14, seriesCount: 4, samplesCount: 18},
|
||||
{mint: 8, maxt: 9, seriesCount: 3, samplesCount: 5},
|
||||
{mint: 9, maxt: 9, seriesCount: 3, samplesCount: 3},
|
||||
{mint: 6, maxt: 9, seriesCount: 4, samplesCount: 10},
|
||||
{mint: 11, maxt: 11, seriesCount: 1, samplesCount: 1},
|
||||
{mint: 11, maxt: 12, seriesCount: 1, samplesCount: 2},
|
||||
{mint: 11, maxt: 14, seriesCount: 1, samplesCount: 2},
|
||||
{mint: 12, maxt: 14, seriesCount: 1, samplesCount: 1},
|
||||
}
|
||||
|
||||
for i, c := range cases {
|
||||
matcher := labels.MustNewMatcher(labels.MatchEqual, "", "")
|
||||
q, err := NewBlockQuerier(h, c.mint, c.maxt)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
seriesCount := 0
|
||||
samplesCount := 0
|
||||
ss, _, err := q.Select(false, nil, matcher)
|
||||
testutil.Ok(t, err)
|
||||
for ss.Next() {
|
||||
i := ss.At().Iterator()
|
||||
for i.Next() {
|
||||
samplesCount++
|
||||
}
|
||||
seriesCount++
|
||||
}
|
||||
testutil.Ok(t, ss.Err())
|
||||
testutil.Equals(t, c.seriesCount, seriesCount, "test series %d", i)
|
||||
testutil.Equals(t, c.samplesCount, samplesCount, "test samples %d", i)
|
||||
q.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemSeriesIsolation(t *testing.T) {
|
||||
// Put a series, select it. GC it and then access it.
|
||||
hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
|
||||
@ -1428,7 +1340,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
||||
defer hb.Close()
|
||||
|
||||
lastValue := func(maxAppendID uint64) int {
|
||||
idx, err := hb.Index(hb.MinTime(), hb.MaxTime())
|
||||
idx, err := hb.Index()
|
||||
testutil.Ok(t, err)
|
||||
|
||||
iso := hb.iso.State()
|
||||
|
@ -71,8 +71,8 @@ type mockBReader struct {
|
||||
maxt int64
|
||||
}
|
||||
|
||||
func (r *mockBReader) Index(mint, maxt int64) (IndexReader, error) { return r.ir, nil }
|
||||
func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil }
|
||||
func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil }
|
||||
func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil }
|
||||
func (r *mockBReader) Tombstones() (tombstones.Reader, error) {
|
||||
return tombstones.NewMemTombstones(), nil
|
||||
}
|
||||
|
@ -154,7 +154,7 @@ func (q *verticalQuerier) sel(sortSeries bool, hints *storage.SelectHints, qs []
|
||||
|
||||
// NewBlockQuerier returns a querier against the reader.
|
||||
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
||||
indexr, err := b.Index(mint, maxt)
|
||||
indexr, err := b.Index()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "open index reader")
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ package tsdb
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
@ -54,7 +53,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) {
|
||||
}
|
||||
testutil.Ok(b, app.Commit())
|
||||
|
||||
ir, err := h.Index(math.MinInt64, math.MaxInt64)
|
||||
ir, err := h.Index()
|
||||
testutil.Ok(b, err)
|
||||
b.Run("Head", func(b *testing.B) {
|
||||
benchmarkPostingsForMatchers(b, ir)
|
||||
@ -72,7 +71,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) {
|
||||
defer func() {
|
||||
testutil.Ok(b, block.Close())
|
||||
}()
|
||||
ir, err = block.Index(math.MinInt64, math.MaxInt64)
|
||||
ir, err = block.Index()
|
||||
testutil.Ok(b, err)
|
||||
defer ir.Close()
|
||||
b.Run("Block", func(b *testing.B) {
|
||||
|
@ -2137,7 +2137,7 @@ func TestPostingsForMatchers(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
ir, err := h.Index(math.MinInt64, math.MaxInt64)
|
||||
ir, err := h.Index()
|
||||
testutil.Ok(t, err)
|
||||
|
||||
for _, c := range cases {
|
||||
|
Loading…
Reference in New Issue
Block a user