promql: export SmapleStream

This commit is contained in:
Fabian Reinartz 2016-12-24 11:29:39 +01:00
parent 6315d00942
commit 65581a3d46
3 changed files with 53 additions and 52 deletions

View File

@ -62,12 +62,13 @@ type ValueType string
// The valid value types. // The valid value types.
const ( const (
ValueTypeNone = "none" ValueTypeNone = "none"
ValueTypeVector = "Vector" ValueTypeVector = "vector"
ValueTypeScalar = "Scalar" ValueTypeScalar = "scalar"
ValueTypeMatrix = "Matrix" ValueTypeMatrix = "matrix"
ValueTypeString = "string" ValueTypeString = "string"
) )
// String represents a string value.
type String struct { type String struct {
V string V string
T int64 T int64
@ -87,13 +88,13 @@ func (s Scalar) String() string {
return "" return ""
} }
// sampleStream is a stream of Values belonging to an attached COWMetric. // SampleStream is a stream of data points belonging to a metric.
type sampleStream struct { type SampleStream struct {
Metric labels.Labels Metric labels.Labels
Values []Point Points []Point
} }
func (s sampleStream) String() string { func (s SampleStream) String() string {
return "" return ""
} }
@ -132,7 +133,7 @@ func (vec Vector) String() string {
// Matrix is a slice of SampleStreams that implements sort.Interface and // Matrix is a slice of SampleStreams that implements sort.Interface and
// has a String method. // has a String method.
type Matrix []sampleStream type Matrix []SampleStream
func (m Matrix) String() string { func (m Matrix) String() string {
// TODO(fabxc): sort, or can we rely on order from the querier? // TODO(fabxc): sort, or can we rely on order from the querier?
@ -456,7 +457,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
numSteps := int(s.End.Sub(s.Start) / s.Interval) numSteps := int(s.End.Sub(s.Start) / s.Interval)
// Range evaluation. // Range evaluation.
sampleStreams := map[uint64]sampleStream{} SampleStreams := map[uint64]SampleStream{}
for ts := s.Start; !ts.After(s.End); ts = ts.Add(s.Interval) { for ts := s.Start; !ts.After(s.End); ts = ts.Add(s.Interval) {
if err := contextDone(ctx, "range evaluation"); err != nil { if err := contextDone(ctx, "range evaluation"); err != nil {
@ -476,24 +477,24 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
case Scalar: case Scalar:
// As the expression type does not change we can safely default to 0 // As the expression type does not change we can safely default to 0
// as the fingerprint for Scalar expressions. // as the fingerprint for Scalar expressions.
ss, ok := sampleStreams[0] ss, ok := SampleStreams[0]
if !ok { if !ok {
ss = sampleStream{Values: make([]Point, 0, numSteps)} ss = SampleStream{Points: make([]Point, 0, numSteps)}
sampleStreams[0] = ss SampleStreams[0] = ss
} }
ss.Values = append(ss.Values, Point(v)) ss.Points = append(ss.Points, Point(v))
case Vector: case Vector:
for _, sample := range v { for _, sample := range v {
h := sample.Metric.Hash() h := sample.Metric.Hash()
ss, ok := sampleStreams[h] ss, ok := SampleStreams[h]
if !ok { if !ok {
ss = sampleStream{ ss = SampleStream{
Metric: sample.Metric, Metric: sample.Metric,
Values: make([]Point, 0, numSteps), Points: make([]Point, 0, numSteps),
} }
sampleStreams[h] = ss SampleStreams[h] = ss
} }
ss.Values = append(ss.Values, sample.Point) ss.Points = append(ss.Points, sample.Point)
} }
default: default:
panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type()))
@ -507,7 +508,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
appendTimer := query.stats.GetTimer(stats.ResultAppendTime).Start() appendTimer := query.stats.GetTimer(stats.ResultAppendTime).Start()
mat := Matrix{} mat := Matrix{}
for _, ss := range sampleStreams { for _, ss := range SampleStreams {
mat = append(mat, ss) mat = append(mat, ss)
} }
appendTimer.Stop() appendTimer.Stop()
@ -821,9 +822,9 @@ func (ev *evaluator) MatrixSelector(node *MatrixSelector) Matrix {
) )
for i, it := range node.iterators { for i, it := range node.iterators {
ss := sampleStream{ ss := SampleStream{
Metric: node.series[i].Labels(), Metric: node.series[i].Labels(),
Values: make([]Point, 0, 16), Points: make([]Point, 0, 16),
} }
if !it.Seek(maxt) { if !it.Seek(maxt) {
@ -838,13 +839,13 @@ func (ev *evaluator) MatrixSelector(node *MatrixSelector) Matrix {
t, v := buf.Values() t, v := buf.Values()
// Values in the buffer are guaranteed to be smaller than maxt. // Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint { if t >= mint {
ss.Values = append(ss.Values, Point{T: t + offset, V: v}) ss.Points = append(ss.Points, Point{T: t + offset, V: v})
} }
} }
// The seeked sample might also be in the range. // The seeked sample might also be in the range.
t, v := it.Values() t, v := it.Values()
if t == maxt { if t == maxt {
ss.Values = append(ss.Values, Point{T: t + offset, V: v}) ss.Points = append(ss.Points, Point{T: t + offset, V: v})
} }
Matrix = append(Matrix, ss) Matrix = append(Matrix, ss)

View File

@ -58,29 +58,29 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu
for _, samples := range MatrixValue { for _, samples := range MatrixValue {
// No sense in trying to compute a rate without at least two points. Drop // No sense in trying to compute a rate without at least two points. Drop
// this Vector element. // this Vector element.
if len(samples.Values) < 2 { if len(samples.Points) < 2 {
continue continue
} }
var ( var (
counterCorrection float64 counterCorrection float64
lastValue float64 lastValue float64
) )
for _, sample := range samples.Values { for _, sample := range samples.Points {
if isCounter && sample.V < lastValue { if isCounter && sample.V < lastValue {
counterCorrection += lastValue counterCorrection += lastValue
} }
lastValue = sample.V lastValue = sample.V
} }
resultValue := lastValue - samples.Values[0].V + counterCorrection resultValue := lastValue - samples.Points[0].V + counterCorrection
// Duration between first/last samples and boundary of range. // Duration between first/last samples and boundary of range.
durationToStart := float64(samples.Values[0].T - rangeStart) durationToStart := float64(samples.Points[0].T - rangeStart)
durationToEnd := float64(rangeEnd - samples.Values[len(samples.Values)-1].T) durationToEnd := float64(rangeEnd - samples.Points[len(samples.Points)-1].T)
sampledInterval := float64(samples.Values[len(samples.Values)-1].T - samples.Values[0].T) sampledInterval := float64(samples.Points[len(samples.Points)-1].T - samples.Points[0].T)
averageDurationBetweenSamples := float64(sampledInterval) / float64(len(samples.Values)-1) averageDurationBetweenSamples := float64(sampledInterval) / float64(len(samples.Points)-1)
if isCounter && resultValue > 0 && samples.Values[0].V >= 0 { if isCounter && resultValue > 0 && samples.Points[0].V >= 0 {
// Counters cannot be negative. If we have any slope at // Counters cannot be negative. If we have any slope at
// all (i.e. resultValue went up), we can extrapolate // all (i.e. resultValue went up), we can extrapolate
// the zero point of the counter. If the duration to the // the zero point of the counter. If the duration to the
@ -88,7 +88,7 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu
// take the zero point as the start of the series, // take the zero point as the start of the series,
// thereby avoiding extrapolation to negative counter // thereby avoiding extrapolation to negative counter
// values. // values.
durationToZero := float64(sampledInterval) * float64(samples.Values[0].V/resultValue) durationToZero := float64(sampledInterval) * float64(samples.Points[0].V/resultValue)
if durationToZero < durationToStart { if durationToZero < durationToStart {
durationToStart = durationToZero durationToStart = durationToZero
} }
@ -154,12 +154,12 @@ func instantValue(ev *evaluator, arg Expr, isRate bool) Value {
for _, samples := range ev.evalMatrix(arg) { for _, samples := range ev.evalMatrix(arg) {
// No sense in trying to compute a rate without at least two points. Drop // No sense in trying to compute a rate without at least two points. Drop
// this Vector element. // this Vector element.
if len(samples.Values) < 2 { if len(samples.Points) < 2 {
continue continue
} }
lastSample := samples.Values[len(samples.Values)-1] lastSample := samples.Points[len(samples.Points)-1]
previousSample := samples.Values[len(samples.Values)-2] previousSample := samples.Points[len(samples.Points)-2]
var resultValue float64 var resultValue float64
if isRate && lastSample.V < previousSample.V { if isRate && lastSample.V < previousSample.V {
@ -236,7 +236,7 @@ func funcHoltWinters(ev *evaluator, args Expressions) Value {
var l int var l int
for _, samples := range mat { for _, samples := range mat {
l = len(samples.Values) l = len(samples.Points)
// Can't do the smoothing operation with less than two points. // Can't do the smoothing operation with less than two points.
if l < 2 { if l < 2 {
@ -251,7 +251,7 @@ func funcHoltWinters(ev *evaluator, args Expressions) Value {
} }
// Fill in the d values with the raw values from the input. // Fill in the d values with the raw values from the input.
for i, v := range samples.Values { for i, v := range samples.Points {
d[i] = v.V d[i] = v.V
} }
@ -409,13 +409,13 @@ func aggrOverTime(ev *evaluator, args Expressions, aggrFn func([]Point) float64)
resultVector := Vector{} resultVector := Vector{}
for _, el := range mat { for _, el := range mat {
if len(el.Values) == 0 { if len(el.Points) == 0 {
continue continue
} }
resultVector = append(resultVector, sample{ resultVector = append(resultVector, sample{
Metric: copyLabels(el.Metric, false), Metric: copyLabels(el.Metric, false),
Point: Point{V: aggrFn(el.Values), T: ev.Timestamp}, Point: Point{V: aggrFn(el.Points), T: ev.Timestamp},
}) })
} }
return resultVector return resultVector
@ -489,13 +489,13 @@ func funcQuantileOverTime(ev *evaluator, args Expressions) Value {
resultVector := Vector{} resultVector := Vector{}
for _, el := range mat { for _, el := range mat {
if len(el.Values) == 0 { if len(el.Points) == 0 {
continue continue
} }
el.Metric = copyLabels(el.Metric, false) el.Metric = copyLabels(el.Metric, false)
values := make(VectorByValueHeap, 0, len(el.Values)) values := make(VectorByValueHeap, 0, len(el.Points))
for _, v := range el.Values { for _, v := range el.Points {
values = append(values, sample{Point: Point{V: v.V}}) values = append(values, sample{Point: Point{V: v.V}})
} }
resultVector = append(resultVector, sample{ resultVector = append(resultVector, sample{
@ -659,10 +659,10 @@ func funcDeriv(ev *evaluator, args Expressions) Value {
for _, samples := range mat { for _, samples := range mat {
// No sense in trying to compute a derivative without at least two points. // No sense in trying to compute a derivative without at least two points.
// Drop this Vector element. // Drop this Vector element.
if len(samples.Values) < 2 { if len(samples.Points) < 2 {
continue continue
} }
slope, _ := linearRegression(samples.Values, 0) slope, _ := linearRegression(samples.Points, 0)
resultSample := sample{ resultSample := sample{
Metric: copyLabels(samples.Metric, false), Metric: copyLabels(samples.Metric, false),
Point: Point{V: slope, T: ev.Timestamp}, Point: Point{V: slope, T: ev.Timestamp},
@ -682,10 +682,10 @@ func funcPredictLinear(ev *evaluator, args Expressions) Value {
for _, samples := range mat { for _, samples := range mat {
// No sense in trying to predict anything without at least two points. // No sense in trying to predict anything without at least two points.
// Drop this Vector element. // Drop this Vector element.
if len(samples.Values) < 2 { if len(samples.Points) < 2 {
continue continue
} }
slope, intercept := linearRegression(samples.Values, ev.Timestamp) slope, intercept := linearRegression(samples.Points, ev.Timestamp)
resultVector = append(resultVector, sample{ resultVector = append(resultVector, sample{
Metric: copyLabels(samples.Metric, false), Metric: copyLabels(samples.Metric, false),
@ -743,8 +743,8 @@ func funcResets(ev *evaluator, args Expressions) Value {
for _, samples := range in { for _, samples := range in {
resets := 0 resets := 0
prev := samples.Values[0].V prev := samples.Points[0].V
for _, sample := range samples.Values[1:] { for _, sample := range samples.Points[1:] {
current := sample.V current := sample.V
if current < prev { if current < prev {
resets++ resets++
@ -767,8 +767,8 @@ func funcChanges(ev *evaluator, args Expressions) Value {
for _, samples := range in { for _, samples := range in {
changes := 0 changes := 0
prev := samples.Values[0].V prev := samples.Points[0].V
for _, sample := range samples.Values[1:] { for _, sample := range samples.Points[1:] {
current := sample.V current := sample.V
if current != prev && !(math.IsNaN(float64(current)) && math.IsNaN(float64(prev))) { if current != prev && !(math.IsNaN(float64(current)) && math.IsNaN(float64(prev))) {
changes++ changes++

View File

@ -366,8 +366,8 @@ func (ev *evalCmd) compareResult(result Value) error {
return fmt.Errorf("expected metric %s with %v at position %d but was at %d", v.Metric, exp.vals, exp.pos, pos+1) return fmt.Errorf("expected metric %s with %v at position %d but was at %d", v.Metric, exp.vals, exp.pos, pos+1)
} }
for i, expVal := range exp.vals { for i, expVal := range exp.vals {
if !almostEqual(expVal.value, v.Values[i].V) { if !almostEqual(expVal.value, v.Points[i].V) {
return fmt.Errorf("expected %v for %s but got %v", expVal, v.Metric, v.Values) return fmt.Errorf("expected %v for %s but got %v", expVal, v.Metric, v.Points)
} }
} }
seen[fp] = true seen[fp] = true