Add prefix label matcher.
Implement labels.PrefixMatcher and use interface conversion in querier to optimize label tuples search. [unit-tests]: Fix bug and populate label index for mock index. Signed-off-by: Dmitry Ilyevsky <ilyevsky@gmail.com>
This commit is contained in:
parent
1378338aab
commit
37194b7a30
|
@ -13,7 +13,10 @@
|
|||
|
||||
package labels
|
||||
|
||||
import "regexp"
|
||||
import (
|
||||
"regexp"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Selector holds constraints for matching against a label set.
|
||||
type Selector []Matcher
|
||||
|
@ -84,3 +87,22 @@ func (m *notMatcher) Matches(v string) bool { return !m.Matcher.Matches(v) }
|
|||
func Not(m Matcher) Matcher {
|
||||
return ¬Matcher{m}
|
||||
}
|
||||
|
||||
// PrefixMatcher implements Matcher for labels which values matches prefix.
|
||||
type PrefixMatcher struct {
|
||||
name, prefix string
|
||||
}
|
||||
|
||||
// NewPrefixMatcher returns new Matcher for label name matching prefix.
|
||||
func NewPrefixMatcher(name, prefix string) Matcher {
|
||||
return &PrefixMatcher{name: name, prefix: prefix}
|
||||
}
|
||||
|
||||
// Name implements Matcher interface.
|
||||
func (m *PrefixMatcher) Name() string { return m.name }
|
||||
|
||||
// Prefix returns matching prefix.
|
||||
func (m *PrefixMatcher) Prefix() string { return m.prefix }
|
||||
|
||||
// Matches implements Matcher interface.
|
||||
func (m *PrefixMatcher) Matches(v string) bool { return strings.HasPrefix(v, m.prefix) }
|
||||
|
|
41
querier.go
41
querier.go
|
@ -15,6 +15,7 @@ package tsdb
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
|
@ -220,6 +221,37 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
|
|||
return p, absent
|
||||
}
|
||||
|
||||
// tuplesByPrefix uses binary search to find prefix matches within ts.
|
||||
func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) {
|
||||
var outErr error
|
||||
tslen := ts.Len()
|
||||
i := sort.Search(tslen, func(i int) bool {
|
||||
vs, err := ts.At(i)
|
||||
if err != nil {
|
||||
outErr = fmt.Errorf("Failed to read tuple %d/%d: %v", i, tslen, err)
|
||||
return true
|
||||
}
|
||||
val := vs[0]
|
||||
l := len(m.Prefix())
|
||||
if l > len(vs) {
|
||||
l = len(val)
|
||||
}
|
||||
return val[:l] >= m.Prefix()
|
||||
})
|
||||
if outErr != nil {
|
||||
return nil, outErr
|
||||
}
|
||||
var matches []string
|
||||
for ; i < tslen; i++ {
|
||||
vs, err := ts.At(i)
|
||||
if err != nil || !m.Matches(vs[0]) {
|
||||
return matches, err
|
||||
}
|
||||
matches = append(matches, vs[0])
|
||||
}
|
||||
return matches, nil
|
||||
}
|
||||
|
||||
func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
|
||||
// Fast-path for equal matching.
|
||||
if em, ok := m.(*labels.EqualMatcher); ok {
|
||||
|
@ -230,15 +262,19 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
|
|||
return it
|
||||
}
|
||||
|
||||
// TODO(fabxc): use interface upgrading to provide fast solution
|
||||
// for prefix matches. Tuples are lexicographically sorted.
|
||||
tpls, err := r.index.LabelValues(m.Name())
|
||||
if err != nil {
|
||||
return errPostings{err: err}
|
||||
}
|
||||
|
||||
var res []string
|
||||
if pm, ok := m.(*labels.PrefixMatcher); ok {
|
||||
res, err = tuplesByPrefix(pm, tpls)
|
||||
if err != nil {
|
||||
return errPostings{err: err}
|
||||
}
|
||||
|
||||
} else {
|
||||
for i := 0; i < tpls.Len(); i++ {
|
||||
vals, err := tpls.At(i)
|
||||
if err != nil {
|
||||
|
@ -248,6 +284,7 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
|
|||
res = append(res, vals[0])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(res) == 0 {
|
||||
return emptyPostings
|
||||
|
|
|
@ -230,6 +230,7 @@ func createIdxChkReaders(tc []struct {
|
|||
|
||||
postings := &memPostings{m: make(map[term][]uint32, 512)}
|
||||
chkReader := mockChunkReader(make(map[uint64]chunks.Chunk))
|
||||
lblIdx := make(map[string]stringset)
|
||||
mi := newMockIndex()
|
||||
|
||||
for i, s := range tc {
|
||||
|
@ -253,16 +254,28 @@ func createIdxChkReaders(tc []struct {
|
|||
chkReader[ref] = chunk
|
||||
}
|
||||
|
||||
mi.AddSeries(uint32(i), labels.FromMap(s.lset), metas...)
|
||||
ls := labels.FromMap(s.lset)
|
||||
mi.AddSeries(uint32(i), ls, metas...)
|
||||
|
||||
postings.add(uint32(i), term{})
|
||||
for _, l := range labels.FromMap(s.lset) {
|
||||
for _, l := range ls {
|
||||
postings.add(uint32(i), term{l.Name, l.Value})
|
||||
|
||||
vs, present := lblIdx[l.Name]
|
||||
if !present {
|
||||
vs = stringset{}
|
||||
lblIdx[l.Name] = vs
|
||||
}
|
||||
vs.set(l.Value)
|
||||
}
|
||||
}
|
||||
|
||||
for l, vs := range lblIdx {
|
||||
mi.WriteLabelIndex([]string{l}, vs.slice())
|
||||
}
|
||||
|
||||
for tm := range postings.m {
|
||||
mi.WritePostings(tm.name, tm.name, postings.get(tm))
|
||||
mi.WritePostings(tm.name, tm.value, postings.get(tm))
|
||||
}
|
||||
|
||||
return mi, chkReader
|
||||
|
@ -334,6 +347,47 @@ func TestBlockQuerier(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{
|
||||
"p": "abcd",
|
||||
"x": "xyz",
|
||||
},
|
||||
chunks: [][]sample{
|
||||
{
|
||||
{1, 2}, {2, 3}, {3, 4},
|
||||
},
|
||||
{
|
||||
{5, 2}, {6, 3}, {7, 4},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{
|
||||
"a": "ab",
|
||||
"p": "abce",
|
||||
},
|
||||
chunks: [][]sample{
|
||||
{
|
||||
{1, 1}, {2, 2}, {3, 3},
|
||||
},
|
||||
{
|
||||
{5, 3}, {6, 6},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{
|
||||
"p": "xyz",
|
||||
},
|
||||
chunks: [][]sample{
|
||||
{
|
||||
{1, 1}, {2, 2}, {3, 3},
|
||||
},
|
||||
{
|
||||
{4, 4}, {5, 5}, {6, 6},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
queries: []query{
|
||||
|
@ -373,11 +427,30 @@ func TestBlockQuerier(t *testing.T) {
|
|||
),
|
||||
}),
|
||||
},
|
||||
{
|
||||
mint: 2,
|
||||
maxt: 6,
|
||||
ms: []labels.Matcher{labels.NewPrefixMatcher("p", "abc")},
|
||||
exp: newListSeriesSet([]Series{
|
||||
newSeries(map[string]string{
|
||||
"p": "abcd",
|
||||
"x": "xyz",
|
||||
},
|
||||
[]sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}},
|
||||
),
|
||||
newSeries(map[string]string{
|
||||
"a": "ab",
|
||||
"p": "abce",
|
||||
},
|
||||
[]sample{{2, 2}, {3, 3}, {5, 3}, {6, 6}},
|
||||
),
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
Outer:
|
||||
for _, c := range cases.queries {
|
||||
for i, c := range cases.queries {
|
||||
ir, cr := createIdxChkReaders(cases.data)
|
||||
querier := &blockQuerier{
|
||||
index: ir,
|
||||
|
@ -392,7 +465,7 @@ Outer:
|
|||
|
||||
for {
|
||||
eok, rok := c.exp.Next(), res.Next()
|
||||
require.Equal(t, eok, rok, "next")
|
||||
require.Equal(t, eok, rok, "%d: next", i)
|
||||
|
||||
if !eok {
|
||||
continue Outer
|
||||
|
@ -400,13 +473,13 @@ Outer:
|
|||
sexp := c.exp.At()
|
||||
sres := res.At()
|
||||
|
||||
require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
|
||||
require.Equal(t, sexp.Labels(), sres.Labels(), "%d: labels", i)
|
||||
|
||||
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
|
||||
smplRes, errRes := expandSeriesIterator(sres.Iterator())
|
||||
|
||||
require.Equal(t, errExp, errRes, "samples error")
|
||||
require.Equal(t, smplExp, smplRes, "samples")
|
||||
require.Equal(t, errExp, errRes, "%d: samples error", i)
|
||||
require.Equal(t, smplExp, smplRes, "%d: samples", i)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue