change labelset comparison in promql engine to avoid false positive during detection of duplicates (#7058)
* Use go1.14 new hash/maphash to hash both RHS and LHS instead of XOR'ing which has been resulting in hash collisions. Signed-off-by: Callum Styan <callumstyan@gmail.com> * Refactor engine labelset signature generation, just use labels.Labels instead of hashes. Signed-off-by: Callum Styan <callumstyan@gmail.com> * Address review comments; function comments + store result of lhs.String+rhs.String as key. Signed-off-by: Callum Styan <callumstyan@gmail.com> * Replace all signatureFunc usage with signatureFuncString. Signed-off-by: Callum Styan <callumstyan@gmail.com> * Make optimizations to labels String function and generation of rhs+lhs as string in resultMetric. Signed-off-by: Callum Styan <callumstyan@gmail.com> * Use separate string functions that don't use strconv just for engine maps. Signed-off-by: Callum Styan <callumstyan@gmail.com> * Use a byte invalid separator instead of quoting and have a buffer attached to EvalNodeHelper instead of using a global pool in the labels package. Signed-off-by: Callum Styan <callumstyan@gmail.com> * Address review comments. Signed-off-by: Callum Styan <callumstyan@gmail.com> * Address more review comments, labels has a function that now builds a byte slice without turning it into a string. Signed-off-by: Callum Styan <callumstyan@gmail.com> * Use two different non-ascii hex codes as byte separators between labels and between sets of labels when building bytes of a Labels struct. Signed-off-by: Callum Styan <callumstyan@gmail.com> * We only need the 2nd byte invalid sep. at the beginning of a labels.Bytes Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
parent
da217cbde2
commit
5bb7f00d00
|
@ -22,14 +22,15 @@ import (
|
|||
"github.com/cespare/xxhash"
|
||||
)
|
||||
|
||||
const sep = '\xff'
|
||||
|
||||
// Well-known label names used by Prometheus components.
|
||||
const (
|
||||
MetricName = "__name__"
|
||||
AlertName = "alertname"
|
||||
BucketLabel = "le"
|
||||
InstanceName = "instance"
|
||||
|
||||
sep = '\xff'
|
||||
labelSep = '\xfe'
|
||||
)
|
||||
|
||||
// Label is a key/value pair of strings.
|
||||
|
@ -59,10 +60,25 @@ func (ls Labels) String() string {
|
|||
b.WriteString(strconv.Quote(l.Value))
|
||||
}
|
||||
b.WriteByte('}')
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// Bytes returns ls as a byte slice.
|
||||
// It uses an byte invalid character as a separator and so should not be used for printing.
|
||||
func (ls Labels) Bytes(buf []byte) []byte {
|
||||
b := bytes.NewBuffer(buf[:0])
|
||||
b.WriteByte(labelSep)
|
||||
for i, l := range ls {
|
||||
if i > 0 {
|
||||
b.WriteByte(sep)
|
||||
}
|
||||
b.WriteString(l.Name)
|
||||
b.WriteByte(sep)
|
||||
b.WriteString(l.Value)
|
||||
}
|
||||
return b.Bytes()
|
||||
}
|
||||
|
||||
// MarshalJSON implements json.Marshaler.
|
||||
func (ls Labels) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(ls.Map())
|
||||
|
@ -172,6 +188,44 @@ func (ls Labels) HashWithoutLabels(b []byte, names ...string) (uint64, []byte) {
|
|||
return xxhash.Sum64(b), b
|
||||
}
|
||||
|
||||
// WithLabels returns a new labels.Labels from ls that only contains labels matching names.
|
||||
// 'names' have to be sorted in ascending order.
|
||||
func (ls Labels) WithLabels(names ...string) Labels {
|
||||
ret := make([]Label, 0, len(ls))
|
||||
|
||||
i, j := 0, 0
|
||||
for i < len(ls) && j < len(names) {
|
||||
if names[j] < ls[i].Name {
|
||||
j++
|
||||
} else if ls[i].Name < names[j] {
|
||||
i++
|
||||
} else {
|
||||
ret = append(ret, ls[i])
|
||||
i++
|
||||
j++
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// WithLabels returns a new labels.Labels from ls that contains labels not matching names.
|
||||
// 'names' have to be sorted in ascending order.
|
||||
func (ls Labels) WithoutLabels(names ...string) Labels {
|
||||
ret := make([]Label, 0, len(ls))
|
||||
|
||||
j := 0
|
||||
for i := range ls {
|
||||
for j < len(names) && names[j] < ls[i].Name {
|
||||
j++
|
||||
}
|
||||
if ls[i].Name == MetricName || (j < len(names) && ls[i].Name == names[j]) {
|
||||
continue
|
||||
}
|
||||
ret = append(ret, ls[i])
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// Copy returns a copy of the labels.
|
||||
func (ls Labels) Copy() Labels {
|
||||
res := make(Labels, len(ls))
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package promql
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/heap"
|
||||
"context"
|
||||
"fmt"
|
||||
|
@ -831,16 +832,20 @@ type EvalNodeHelper struct {
|
|||
// dropMetricName and label_*.
|
||||
dmn map[uint64]labels.Labels
|
||||
// signatureFunc.
|
||||
sigf map[uint64]uint64
|
||||
sigf map[string]string
|
||||
// funcHistogramQuantile.
|
||||
signatureToMetricWithBuckets map[uint64]*metricWithBuckets
|
||||
signatureToMetricWithBuckets map[string]*metricWithBuckets
|
||||
// label_replace.
|
||||
regex *regexp.Regexp
|
||||
|
||||
lb *labels.Builder
|
||||
lblBuf []byte
|
||||
lblResultBuf []byte
|
||||
|
||||
// For binary vector matching.
|
||||
rightSigs map[uint64]Sample
|
||||
matchedSigs map[uint64]map[uint64]struct{}
|
||||
resultMetric map[uint64]labels.Labels
|
||||
rightSigs map[string]Sample
|
||||
matchedSigs map[string]map[uint64]struct{}
|
||||
resultMetric map[string]labels.Labels
|
||||
}
|
||||
|
||||
// dropMetricName is a cached version of dropMetricName.
|
||||
|
@ -858,20 +863,19 @@ func (enh *EvalNodeHelper) dropMetricName(l labels.Labels) labels.Labels {
|
|||
return ret
|
||||
}
|
||||
|
||||
// signatureFunc is a cached version of signatureFunc.
|
||||
func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.Labels) uint64 {
|
||||
func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.Labels) string {
|
||||
if enh.sigf == nil {
|
||||
enh.sigf = make(map[uint64]uint64, len(enh.out))
|
||||
enh.sigf = make(map[string]string, len(enh.out))
|
||||
}
|
||||
f := signatureFunc(on, names...)
|
||||
return func(l labels.Labels) uint64 {
|
||||
h := l.Hash()
|
||||
ret, ok := enh.sigf[h]
|
||||
f := signatureFunc(on, enh.lblBuf, names...)
|
||||
return func(l labels.Labels) string {
|
||||
enh.lblBuf = l.Bytes(enh.lblBuf)
|
||||
ret, ok := enh.sigf[string(enh.lblBuf)]
|
||||
if ok {
|
||||
return ret
|
||||
}
|
||||
ret = f(l)
|
||||
enh.sigf[h] = ret
|
||||
enh.sigf[string(enh.lblBuf)] = ret
|
||||
return ret
|
||||
}
|
||||
}
|
||||
|
@ -1527,7 +1531,7 @@ func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching,
|
|||
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...)
|
||||
|
||||
// The set of signatures for the right-hand side Vector.
|
||||
rightSigs := map[uint64]struct{}{}
|
||||
rightSigs := map[string]struct{}{}
|
||||
// Add all rhs samples to a map so we can easily find matches later.
|
||||
for _, rs := range rhs {
|
||||
rightSigs[sigf(rs.Metric)] = struct{}{}
|
||||
|
@ -1548,7 +1552,7 @@ func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *parser.VectorMatching,
|
|||
}
|
||||
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...)
|
||||
|
||||
leftSigs := map[uint64]struct{}{}
|
||||
leftSigs := map[string]struct{}{}
|
||||
// Add everything from the left-hand-side Vector.
|
||||
for _, ls := range lhs {
|
||||
leftSigs[sigf(ls.Metric)] = struct{}{}
|
||||
|
@ -1569,7 +1573,7 @@ func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *parser.VectorMatchi
|
|||
}
|
||||
sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...)
|
||||
|
||||
rightSigs := map[uint64]struct{}{}
|
||||
rightSigs := map[string]struct{}{}
|
||||
for _, rs := range rhs {
|
||||
rightSigs[sigf(rs.Metric)] = struct{}{}
|
||||
}
|
||||
|
@ -1598,7 +1602,7 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *
|
|||
|
||||
// All samples from the rhs hashed by the matching label/values.
|
||||
if enh.rightSigs == nil {
|
||||
enh.rightSigs = make(map[uint64]Sample, len(enh.out))
|
||||
enh.rightSigs = make(map[string]Sample, len(enh.out))
|
||||
} else {
|
||||
for k := range enh.rightSigs {
|
||||
delete(enh.rightSigs, k)
|
||||
|
@ -1628,7 +1632,7 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *
|
|||
// Tracks the match-signature. For one-to-one operations the value is nil. For many-to-one
|
||||
// the value is a set of signatures to detect duplicated result elements.
|
||||
if enh.matchedSigs == nil {
|
||||
enh.matchedSigs = make(map[uint64]map[uint64]struct{}, len(rightSigs))
|
||||
enh.matchedSigs = make(map[string]map[uint64]struct{}, len(rightSigs))
|
||||
} else {
|
||||
for k := range enh.matchedSigs {
|
||||
delete(enh.matchedSigs, k)
|
||||
|
@ -1662,7 +1666,6 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *
|
|||
continue
|
||||
}
|
||||
metric := resultMetric(ls.Metric, rs.Metric, op, matching, enh)
|
||||
|
||||
insertedSigs, exists := matchedSigs[sig]
|
||||
if matching.Card == parser.CardOneToOne {
|
||||
if exists {
|
||||
|
@ -1692,19 +1695,15 @@ func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *
|
|||
return enh.out
|
||||
}
|
||||
|
||||
// signatureFunc returns a function that calculates the signature for a metric
|
||||
// ignoring the provided labels. If on, then the given labels are only used instead.
|
||||
func signatureFunc(on bool, names ...string) func(labels.Labels) uint64 {
|
||||
func signatureFunc(on bool, b []byte, names ...string) func(labels.Labels) string {
|
||||
sort.Strings(names)
|
||||
if on {
|
||||
return func(lset labels.Labels) uint64 {
|
||||
h, _ := lset.HashForLabels(make([]byte, 0, 1024), names...)
|
||||
return h
|
||||
return func(lset labels.Labels) string {
|
||||
return string(lset.WithLabels(names...).Bytes(b))
|
||||
}
|
||||
}
|
||||
return func(lset labels.Labels) uint64 {
|
||||
h, _ := lset.HashWithoutLabels(make([]byte, 0, 1024), names...)
|
||||
return h
|
||||
return func(lset labels.Labels) string {
|
||||
return string(lset.WithoutLabels(names...).Bytes(b))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1712,22 +1711,29 @@ func signatureFunc(on bool, names ...string) func(labels.Labels) uint64 {
|
|||
// binary operation and the matching options.
|
||||
func resultMetric(lhs, rhs labels.Labels, op parser.ItemType, matching *parser.VectorMatching, enh *EvalNodeHelper) labels.Labels {
|
||||
if enh.resultMetric == nil {
|
||||
enh.resultMetric = make(map[uint64]labels.Labels, len(enh.out))
|
||||
enh.resultMetric = make(map[string]labels.Labels, len(enh.out))
|
||||
}
|
||||
// op and matching are always the same for a given node, so
|
||||
// there's no need to include them in the hash key.
|
||||
// If the lhs and rhs are the same then the xor would be 0,
|
||||
// so add in one side to protect against that.
|
||||
lh := lhs.Hash()
|
||||
h := (lh ^ rhs.Hash()) + lh
|
||||
if ret, ok := enh.resultMetric[h]; ok {
|
||||
|
||||
if enh.lb == nil {
|
||||
enh.lb = labels.NewBuilder(lhs)
|
||||
} else {
|
||||
enh.lb.Reset(lhs)
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(enh.lblResultBuf[:0])
|
||||
enh.lblBuf = lhs.Bytes(enh.lblBuf)
|
||||
buf.Write(enh.lblBuf)
|
||||
enh.lblBuf = rhs.Bytes(enh.lblBuf)
|
||||
buf.Write(enh.lblBuf)
|
||||
enh.lblResultBuf = buf.Bytes()
|
||||
|
||||
if ret, ok := enh.resultMetric[string(enh.lblResultBuf)]; ok {
|
||||
return ret
|
||||
}
|
||||
|
||||
lb := labels.NewBuilder(lhs)
|
||||
str := string(enh.lblResultBuf)
|
||||
|
||||
if shouldDropMetricName(op) {
|
||||
lb.Del(labels.MetricName)
|
||||
enh.lb.Del(labels.MetricName)
|
||||
}
|
||||
|
||||
if matching.Card == parser.CardOneToOne {
|
||||
|
@ -1739,23 +1745,23 @@ func resultMetric(lhs, rhs labels.Labels, op parser.ItemType, matching *parser.V
|
|||
continue Outer
|
||||
}
|
||||
}
|
||||
lb.Del(l.Name)
|
||||
enh.lb.Del(l.Name)
|
||||
}
|
||||
} else {
|
||||
lb.Del(matching.MatchingLabels...)
|
||||
enh.lb.Del(matching.MatchingLabels...)
|
||||
}
|
||||
}
|
||||
for _, ln := range matching.Include {
|
||||
// Included labels from the `group_x` modifier are taken from the "one"-side.
|
||||
if v := rhs.Get(ln); v != "" {
|
||||
lb.Set(ln, v)
|
||||
enh.lb.Set(ln, v)
|
||||
} else {
|
||||
lb.Del(ln)
|
||||
enh.lb.Del(ln)
|
||||
}
|
||||
}
|
||||
|
||||
ret := lb.Labels()
|
||||
enh.resultMetric[h] = ret
|
||||
ret := enh.lb.Labels()
|
||||
enh.resultMetric[str] = ret
|
||||
return ret
|
||||
}
|
||||
|
||||
|
|
|
@ -598,10 +598,10 @@ func funcPredictLinear(vals []parser.Value, args parser.Expressions, enh *EvalNo
|
|||
func funcHistogramQuantile(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelper) Vector {
|
||||
q := vals[0].(Vector)[0].V
|
||||
inVec := vals[1].(Vector)
|
||||
sigf := enh.signatureFunc(false, excludedLabels...)
|
||||
sigf := signatureFunc(false, enh.lblBuf, excludedLabels...)
|
||||
|
||||
if enh.signatureToMetricWithBuckets == nil {
|
||||
enh.signatureToMetricWithBuckets = map[uint64]*metricWithBuckets{}
|
||||
enh.signatureToMetricWithBuckets = map[string]*metricWithBuckets{}
|
||||
} else {
|
||||
for _, v := range enh.signatureToMetricWithBuckets {
|
||||
v.buckets = v.buckets[:0]
|
||||
|
@ -616,16 +616,16 @@ func funcHistogramQuantile(vals []parser.Value, args parser.Expressions, enh *Ev
|
|||
// TODO(beorn7): Issue a warning somehow.
|
||||
continue
|
||||
}
|
||||
hash := sigf(el.Metric)
|
||||
l := sigf(el.Metric)
|
||||
|
||||
mb, ok := enh.signatureToMetricWithBuckets[hash]
|
||||
mb, ok := enh.signatureToMetricWithBuckets[l]
|
||||
if !ok {
|
||||
el.Metric = labels.NewBuilder(el.Metric).
|
||||
Del(labels.BucketLabel, labels.MetricName).
|
||||
Labels()
|
||||
|
||||
mb = &metricWithBuckets{el.Metric, nil}
|
||||
enh.signatureToMetricWithBuckets[hash] = mb
|
||||
enh.signatureToMetricWithBuckets[l] = mb
|
||||
}
|
||||
mb.buckets = append(mb.buckets, bucket{upperBound, el.V})
|
||||
}
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
|
||||
load 1s
|
||||
node_namespace_pod:kube_pod_info:{namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1
|
||||
node_cpu_seconds_total{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"} 449
|
||||
node_cpu_seconds_total{cpu="35",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"} 449
|
||||
node_cpu_seconds_total{cpu="89",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"} 449
|
||||
|
||||
eval instant at 4s count by(namespace, pod, cpu) (node_cpu_seconds_total{cpu=~".*",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v"}) * on(namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{namespace="observability",pod="node-exporter-l454v"}
|
||||
{cpu="10",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1
|
||||
{cpu="35",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1
|
||||
{cpu="89",namespace="observability",node="gke-search-infra-custom-96-253440-fli-d135b119-jx00",pod="node-exporter-l454v"} 1
|
Loading…
Reference in New Issue