Merge pull request #700 from prometheus/fabxc/binops
Improve vector binops evaluation.
This commit is contained in:
commit
30b346a430
276
promql/engine.go
276
promql/engine.go
|
@ -592,8 +592,14 @@ func (ev *evaluator) eval(expr Expr) Value {
|
|||
}
|
||||
|
||||
case lt == ExprVector && rt == ExprVector:
|
||||
return ev.vectorBinop(e.Op, lhs.(Vector), rhs.(Vector), e.VectorMatching)
|
||||
|
||||
switch e.Op {
|
||||
case itemLAND:
|
||||
return ev.vectorAnd(lhs.(Vector), rhs.(Vector), e.VectorMatching)
|
||||
case itemLOR:
|
||||
return ev.vectorOr(lhs.(Vector), rhs.(Vector), e.VectorMatching)
|
||||
default:
|
||||
return ev.vectorBinop(e.Op, lhs.(Vector), rhs.(Vector), e.VectorMatching)
|
||||
}
|
||||
case lt == ExprVector && rt == ExprScalar:
|
||||
return ev.vectorScalarBinop(e.Op, lhs.(Vector), rhs.(*Scalar), false)
|
||||
|
||||
|
@ -698,111 +704,173 @@ func (ev *evaluator) matrixSelectorBounds(node *MatrixSelector) Matrix {
|
|||
return Matrix(sampleStreams)
|
||||
}
|
||||
|
||||
// vectorBinop evaluates a binary operation between two vector values.
|
||||
func (ev *evaluator) vectorAnd(lhs, rhs Vector, matching *VectorMatching) Vector {
|
||||
if matching.Card != CardManyToMany {
|
||||
panic("logical operations must always be many-to-many matching")
|
||||
}
|
||||
// If no matching labels are specified, match by all labels.
|
||||
sigf := signatureFunc(matching.On...)
|
||||
|
||||
var result Vector
|
||||
// The set of signatures for the right-hand side vector.
|
||||
rightSigs := map[uint64]struct{}{}
|
||||
// Add all rhs samples to a map so we can easily find matches later.
|
||||
for _, rs := range rhs {
|
||||
rightSigs[sigf(rs.Metric)] = struct{}{}
|
||||
}
|
||||
|
||||
for _, ls := range lhs {
|
||||
// If there's a matching entry in the right-hand side vector, add the sample.
|
||||
if _, ok := rightSigs[sigf(ls.Metric)]; ok {
|
||||
result = append(result, ls)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (ev *evaluator) vectorOr(lhs, rhs Vector, matching *VectorMatching) Vector {
|
||||
if matching.Card != CardManyToMany {
|
||||
panic("logical operations must always be many-to-many matching")
|
||||
}
|
||||
sigf := signatureFunc(matching.On...)
|
||||
|
||||
var result Vector
|
||||
leftSigs := map[uint64]struct{}{}
|
||||
// Add everything from the left-hand-side vector.
|
||||
for _, ls := range lhs {
|
||||
leftSigs[sigf(ls.Metric)] = struct{}{}
|
||||
result = append(result, ls)
|
||||
}
|
||||
// Add all right-hand side elements which have not been added from the left-hand side.
|
||||
for _, rs := range rhs {
|
||||
if _, ok := leftSigs[sigf(rs.Metric)]; !ok {
|
||||
result = append(result, rs)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// vectorBinop evaluates a binary operation between two vector, excluding AND and OR.
|
||||
func (ev *evaluator) vectorBinop(op itemType, lhs, rhs Vector, matching *VectorMatching) Vector {
|
||||
result := make(Vector, 0, len(rhs))
|
||||
if matching.Card == CardManyToMany {
|
||||
panic("many-to-many only allowed for AND and OR")
|
||||
}
|
||||
var (
|
||||
result = Vector{}
|
||||
sigf = signatureFunc(matching.On...)
|
||||
resultLabels = append(matching.On, matching.Include...)
|
||||
)
|
||||
|
||||
// The control flow below handles one-to-one or many-to-one matching.
|
||||
// For one-to-many, swap sidedness and account for the swap when calculating
|
||||
// values.
|
||||
if matching.Card == CardOneToMany {
|
||||
lhs, rhs = rhs, lhs
|
||||
}
|
||||
|
||||
// All samples from the rhs hashed by the matching label/values.
|
||||
rm := map[uint64]*Sample{}
|
||||
// Maps the hash of the label values used for matching to the hashes of the label
|
||||
// values of the include labels (if any). It is used to keep track of already
|
||||
// inserted samples.
|
||||
added := map[uint64][]uint64{}
|
||||
rightSigs := map[uint64]*Sample{}
|
||||
|
||||
// Add all rhs samples to a map so we can easily find matches later.
|
||||
for _, rs := range rhs {
|
||||
hash := hashForMetric(rs.Metric.Metric, matching.On)
|
||||
sig := sigf(rs.Metric)
|
||||
// The rhs is guaranteed to be the 'one' side. Having multiple samples
|
||||
// with the same hash means that the matching is many-to-many,
|
||||
// which is not supported.
|
||||
if _, found := rm[hash]; matching.Card != CardManyToMany && found {
|
||||
// with the same signature means that the matching is many-to-many.
|
||||
if _, found := rightSigs[sig]; found {
|
||||
// Many-to-many matching not allowed.
|
||||
ev.errorf("many-to-many matching not allowed")
|
||||
ev.errorf("many-to-many matching not allowed: matching labels must be unique on one side")
|
||||
}
|
||||
// In many-to-many matching the entry is simply overwritten. It can thus only
|
||||
// be used to check whether any matching rhs entry exists but not retrieve them all.
|
||||
rm[hash] = rs
|
||||
rightSigs[sig] = rs
|
||||
}
|
||||
|
||||
// Tracks the match-signature. For one-to-one operations the value is nil. For many-to-one
|
||||
// the value is a set of signatures to detect duplicated result elements.
|
||||
matchedSigs := map[uint64]map[uint64]struct{}{}
|
||||
|
||||
// For all lhs samples find a respective rhs sample and perform
|
||||
// the binary operation.
|
||||
for _, ls := range lhs {
|
||||
hash := hashForMetric(ls.Metric.Metric, matching.On)
|
||||
// Any lhs sample we encounter in an OR operation belongs to the result.
|
||||
if op == itemLOR {
|
||||
ls.Metric = resultMetric(op, ls, nil, matching)
|
||||
result = append(result, ls)
|
||||
added[hash] = nil // Ensure matching rhs sample is not added later.
|
||||
continue
|
||||
}
|
||||
sig := sigf(ls.Metric)
|
||||
|
||||
rs, found := rm[hash] // Look for a match in the rhs vector.
|
||||
rs, found := rightSigs[sig] // Look for a match in the rhs vector.
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
var value clientmodel.SampleValue
|
||||
var keep bool
|
||||
|
||||
if op == itemLAND {
|
||||
value = ls.Value
|
||||
keep = true
|
||||
// Account for potentially swapped sidedness.
|
||||
vl, vr := ls.Value, rs.Value
|
||||
if matching.Card == CardOneToMany {
|
||||
vl, vr = vr, vl
|
||||
}
|
||||
value, keep := vectorElemBinop(op, vl, vr)
|
||||
if !keep {
|
||||
continue
|
||||
}
|
||||
metric := resultMetric(ls.Metric, op, resultLabels...)
|
||||
|
||||
insertedSigs, exists := matchedSigs[sig]
|
||||
if matching.Card == CardOneToOne {
|
||||
if exists {
|
||||
ev.errorf("multiple matches for labels: many-to-one matching must be explicit (group_left/group_right)")
|
||||
}
|
||||
matchedSigs[sig] = nil // Set existance to true.
|
||||
} else {
|
||||
if _, exists := added[hash]; matching.Card == CardOneToOne && exists {
|
||||
// Many-to-one matching must be explicit.
|
||||
ev.errorf("many-to-one matching must be explicit")
|
||||
// In many-to-one matching the grouping labels have to ensure a unique metric
|
||||
// for the result vector. Check whether those labels have already been added for
|
||||
// the same matching labels.
|
||||
insertSig := clientmodel.SignatureForLabels(metric.Metric, matching.Include)
|
||||
if !exists {
|
||||
insertedSigs = map[uint64]struct{}{}
|
||||
matchedSigs[sig] = insertedSigs
|
||||
} else if _, duplicate := insertedSigs[insertSig]; duplicate {
|
||||
ev.errorf("multiple matches for labels: grouping labels must ensure unique matches")
|
||||
}
|
||||
// Account for potentially swapped sidedness.
|
||||
vl, vr := ls.Value, rs.Value
|
||||
if matching.Card == CardOneToMany {
|
||||
vl, vr = vr, vl
|
||||
}
|
||||
value, keep = vectorElemBinop(op, vl, vr)
|
||||
insertedSigs[insertSig] = struct{}{}
|
||||
}
|
||||
|
||||
if keep {
|
||||
metric := resultMetric(op, ls, rs, matching)
|
||||
// Check if the same label set has been added for a many-to-one matching before.
|
||||
if matching.Card == CardManyToOne || matching.Card == CardOneToMany {
|
||||
insHash := clientmodel.SignatureForLabels(metric.Metric, matching.Include)
|
||||
if ihs, exists := added[hash]; exists {
|
||||
for _, ih := range ihs {
|
||||
if ih == insHash {
|
||||
ev.errorf("metric with label set has already been matched")
|
||||
}
|
||||
}
|
||||
added[hash] = append(ihs, insHash)
|
||||
} else {
|
||||
added[hash] = []uint64{insHash}
|
||||
}
|
||||
}
|
||||
ns := &Sample{
|
||||
Metric: metric,
|
||||
Value: value,
|
||||
Timestamp: ev.Timestamp,
|
||||
}
|
||||
result = append(result, ns)
|
||||
added[hash] = added[hash] // Set existance to true.
|
||||
}
|
||||
}
|
||||
|
||||
// Add all remaining samples in the rhs in an OR operation if they
|
||||
// have not been matched up with a lhs sample.
|
||||
if op == itemLOR {
|
||||
for hash, rs := range rm {
|
||||
if _, exists := added[hash]; !exists {
|
||||
rs.Metric = resultMetric(op, rs, nil, matching)
|
||||
result = append(result, rs)
|
||||
}
|
||||
}
|
||||
result = append(result, &Sample{
|
||||
Metric: metric,
|
||||
Value: value,
|
||||
Timestamp: ev.Timestamp,
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// signatureFunc returns a function that calculates the signature for a metric
|
||||
// based on the provided labels.
|
||||
func signatureFunc(labels ...clientmodel.LabelName) func(m clientmodel.COWMetric) uint64 {
|
||||
if len(labels) == 0 {
|
||||
return func(m clientmodel.COWMetric) uint64 {
|
||||
m.Delete(clientmodel.MetricNameLabel)
|
||||
return uint64(m.Metric.Fingerprint())
|
||||
}
|
||||
}
|
||||
return func(m clientmodel.COWMetric) uint64 {
|
||||
return clientmodel.SignatureForLabels(m.Metric, labels)
|
||||
}
|
||||
}
|
||||
|
||||
// resultMetric returns the metric for the given sample(s) based on the vector
|
||||
// binary operation and the matching options.
|
||||
func resultMetric(met clientmodel.COWMetric, op itemType, labels ...clientmodel.LabelName) clientmodel.COWMetric {
|
||||
if len(labels) == 0 {
|
||||
if shouldDropMetricName(op) {
|
||||
met.Delete(clientmodel.MetricNameLabel)
|
||||
}
|
||||
return met
|
||||
}
|
||||
// As we definitly write, creating a new metric is the easiest solution.
|
||||
m := clientmodel.Metric{}
|
||||
for _, ln := range labels {
|
||||
// Included labels from the `group_x` modifier are taken from the "many"-side.
|
||||
if v, ok := met.Metric[ln]; ok {
|
||||
m[ln] = v
|
||||
}
|
||||
}
|
||||
return clientmodel.COWMetric{Metric: m, Copied: false}
|
||||
}
|
||||
|
||||
// vectorScalarBinop evaluates a binary operation between a vector and a scalar.
|
||||
func (ev *evaluator) vectorScalarBinop(op itemType, lhs Vector, rhs *Scalar, swap bool) Vector {
|
||||
vector := make(Vector, 0, len(lhs))
|
||||
|
@ -1018,64 +1086,6 @@ func shouldDropMetricName(op itemType) bool {
|
|||
}
|
||||
}
|
||||
|
||||
// resultMetric returns the metric for the given sample(s) based on the vector
|
||||
// binary operation and the matching options.
|
||||
func resultMetric(op itemType, ls, rs *Sample, matching *VectorMatching) clientmodel.COWMetric {
|
||||
if len(matching.On) == 0 || op == itemLOR || op == itemLAND {
|
||||
if shouldDropMetricName(op) {
|
||||
ls.Metric.Delete(clientmodel.MetricNameLabel)
|
||||
}
|
||||
return ls.Metric
|
||||
}
|
||||
|
||||
m := clientmodel.Metric{}
|
||||
for _, ln := range matching.On {
|
||||
m[ln] = ls.Metric.Metric[ln]
|
||||
}
|
||||
|
||||
for _, ln := range matching.Include {
|
||||
// Included labels from the `group_x` modifier are taken from the "many"-side.
|
||||
v, ok := ls.Metric.Metric[ln]
|
||||
if ok {
|
||||
m[ln] = v
|
||||
}
|
||||
}
|
||||
return clientmodel.COWMetric{false, m}
|
||||
}
|
||||
|
||||
// hashForMetric calculates a hash value for the given metric based on the matching
|
||||
// options for the binary operation.
|
||||
func hashForMetric(metric clientmodel.Metric, withLabels clientmodel.LabelNames) uint64 {
|
||||
var labels clientmodel.LabelNames
|
||||
|
||||
if len(withLabels) > 0 {
|
||||
var match bool
|
||||
for _, ln := range withLabels {
|
||||
if _, match = metric[ln]; !match {
|
||||
break
|
||||
}
|
||||
}
|
||||
// If the metric does not contain the labels to match on, build the hash
|
||||
// over the whole metric to give it a unique hash.
|
||||
if !match {
|
||||
labels = make(clientmodel.LabelNames, 0, len(metric))
|
||||
for ln := range metric {
|
||||
labels = append(labels, ln)
|
||||
}
|
||||
} else {
|
||||
labels = withLabels
|
||||
}
|
||||
} else {
|
||||
labels = make(clientmodel.LabelNames, 0, len(metric))
|
||||
for ln := range metric {
|
||||
if ln != clientmodel.MetricNameLabel {
|
||||
labels = append(labels, ln)
|
||||
}
|
||||
}
|
||||
}
|
||||
return clientmodel.SignatureForLabels(metric, labels)
|
||||
}
|
||||
|
||||
// chooseClosestSample chooses the closest sample of a list of samples
|
||||
// surrounding a given target time. If samples are found both before and after
|
||||
// the target time, the sample value is interpolated between these. Otherwise,
|
||||
|
|
Loading…
Reference in New Issue