diff --git a/tsdb/head.go b/tsdb/head.go index 435dfaf16..54d889799 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -564,6 +564,21 @@ func (h *Head) Init(minValidTime int64) error { h.updateWALReplayStatusRead(i) } + { + // Set the sparseHistogramSeries metric once replay is done. + // This is a temporary hack. + // TODO: remove this hack and do it while replaying WAL if we keep this metric around. + sparseHistogramSeries := 0 + for _, m := range h.series.series { + for _, ms := range m { + if ms.sparseHistogramSeries { + sparseHistogramSeries++ + } + } + } + h.metrics.sparseHistogramSeries.Set(float64(sparseHistogramSeries)) + } + walReplayDuration := time.Since(start) h.metrics.walTotalReplayDuration.Set(walReplayDuration.Seconds()) level.Info(h.logger).Log( diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 0d0ec2f3e..52b53e8ba 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -629,6 +629,7 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen } s.app.AppendHistogram(t, sh) + s.sparseHistogramSeries = true c.maxTime = t diff --git a/tsdb/head_test.go b/tsdb/head_test.go index f215cef84..9b9dbc749 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2770,3 +2770,38 @@ func TestChunkSnapshot(t *testing.T) { require.Equal(t, expTombstones, actTombstones) } } + +func TestSparseHistogramMetrics(t *testing.T) { + head, _ := newTestHead(t, 1000, false) + t.Cleanup(func() { + require.NoError(t, head.Close()) + }) + require.NoError(t, head.Init(0)) + + expHistSeries, expHistSamples := 0, 0 + + for x := 0; x < 5; x++ { + expHistSeries++ + l := labels.Labels{{Name: "a", Value: fmt.Sprintf("b%d", x)}} + for i, h := range generateHistograms(10) { + app := head.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, int64(i), h) + require.NoError(t, err) + require.NoError(t, app.Commit()) + expHistSamples++ + } + } + + require.Equal(t, float64(expHistSeries), prom_testutil.ToFloat64(head.metrics.sparseHistogramSeries)) + require.Equal(t, float64(expHistSamples), prom_testutil.ToFloat64(head.metrics.sparseHistogramSamplesTotal)) + + require.NoError(t, head.Close()) + w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(0)) + + require.Equal(t, float64(expHistSeries), prom_testutil.ToFloat64(head.metrics.sparseHistogramSeries)) + require.Equal(t, float64(0), prom_testutil.ToFloat64(head.metrics.sparseHistogramSamplesTotal)) // Counter reset. +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 65a3482d1..0ff2c275c 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -154,6 +154,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks continue } + if ms.head() == nil { + // First histogram for the series. Count this in metrics. + ms.sparseHistogramSeries = true + } + if rh.T < h.minValidTime.Load() { continue } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index d3b9580b6..4551bdc1d 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -40,7 +40,6 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/exemplar" - "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" @@ -345,64 +344,59 @@ func (api *API) options(r *http.Request) apiFuncResult { } func (api *API) query(r *http.Request) (result apiFuncResult) { - //ts, err := parseTimeParam(r, "time", api.now()) - //if err != nil { - // return invalidParamError(err, "time") - //} - //ctx := r.Context() - //if to := r.FormValue("timeout"); to != "" { - // var cancel context.CancelFunc - // timeout, err := parseDuration(to) - // if err != nil { - // return invalidParamError(err, "timeout") - // } - // - // ctx, cancel = context.WithTimeout(ctx, timeout) - // defer cancel() - //} - // - ////qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts) - ////if err == promql.ErrValidationAtModifierDisabled { - //// err = errors.New("@ modifier is disabled, use --enable-feature=promql-at-modifier to enable it") - ////} else if err == promql.ErrValidationNegativeOffsetDisabled { - //// err = errors.New("negative offset is disabled, use --enable-feature=promql-negative-offset to enable it") - ////} - ////if err != nil { - //// return invalidParamError(err, "query") - ////} - //// - ////// From now on, we must only return with a finalizer in the result (to - ////// be called by the caller) or call qry.Close ourselves (which is - ////// required in the case of a panic). - ////defer func() { - //// if result.finalizer == nil { - //// qry.Close() - //// } - ////}() - //// - ////ctx = httputil.ContextFromRequest(ctx, r) - //// - ////res := qry.Exec(ctx) - ////if res.Err != nil { - //// return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} - ////} - //// - ////// Optional stats field in response if parameter "stats" is not empty. - ////var qs *stats.QueryStats - ////if r.FormValue("stats") != "" { - //// qs = stats.NewQueryStats(qry.Stats()) - ////} - //// - ////return apiFuncResult{&queryData{ - //// ResultType: res.Value.Type(), - //// Result: res.Value, - //// Stats: qs, - ////}, nil, res.Warnings, qry.Close} + ts, err := parseTimeParam(r, "time", api.now()) + if err != nil { + return invalidParamError(err, "time") + } + ctx := r.Context() + if to := r.FormValue("timeout"); to != "" { + var cancel context.CancelFunc + timeout, err := parseDuration(to) + if err != nil { + return invalidParamError(err, "timeout") + } + + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts) + if err == promql.ErrValidationAtModifierDisabled { + err = errors.New("@ modifier is disabled, use --enable-feature=promql-at-modifier to enable it") + } else if err == promql.ErrValidationNegativeOffsetDisabled { + err = errors.New("negative offset is disabled, use --enable-feature=promql-negative-offset to enable it") + } + if err != nil { + return invalidParamError(err, "query") + } + + // From now on, we must only return with a finalizer in the result (to + // be called by the caller) or call qry.Close ourselves (which is + // required in the case of a panic). + defer func() { + if result.finalizer == nil { + qry.Close() + } + }() + + ctx = httputil.ContextFromRequest(ctx, r) + + res := qry.Exec(ctx) + if res.Err != nil { + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} + } + + // Optional stats field in response if parameter "stats" is not empty. + var qs *stats.QueryStats + if r.FormValue("stats") != "" { + qs = stats.NewQueryStats(qry.Stats()) + } return apiFuncResult{&queryData{ - ResultType: parser.ValueTypeVector, - Result: promql.Vector{}, - }, nil, nil, nil} + ResultType: res.Value.Type(), + Result: res.Value, + Stats: qs, + }, nil, res.Warnings, qry.Close} } func (api *API) queryRange(r *http.Request) (result apiFuncResult) { @@ -446,182 +440,226 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) { defer cancel() } - //qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step) - //if err == promql.ErrValidationAtModifierDisabled { - // err = errors.New("@ modifier is disabled, use --enable-feature=promql-at-modifier to enable it") - //} else if err == promql.ErrValidationNegativeOffsetDisabled { - // err = errors.New("negative offset is disabled, use --enable-feature=promql-negative-offset to enable it") - //} - //if err != nil { - // return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} - //} - //// From now on, we must only return with a finalizer in the result (to - //// be called by the caller) or call qry.Close ourselves (which is - //// required in the case of a panic). - //defer func() { - // if result.finalizer == nil { - // qry.Close() - // } - //}() - // - //ctx = httputil.ContextFromRequest(ctx, r) - // - //res := qry.Exec(ctx) - //if res.Err != nil { - // return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} - //} - // - //// Optional stats field in response if parameter "stats" is not empty. - //var qs *stats.QueryStats - //if r.FormValue("stats") != "" { - // qs = stats.NewQueryStats(qry.Stats()) - //} - // - //return apiFuncResult{&queryData{ - // ResultType: res.Value.Type(), - // Result: res.Value, - // Stats: qs, - //}, nil, res.Warnings, qry.Close} - - expr, err := parser.ParseExpr(r.FormValue("query")) + qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step) + if err == promql.ErrValidationAtModifierDisabled { + err = errors.New("@ modifier is disabled, use --enable-feature=promql-at-modifier to enable it") + } else if err == promql.ErrValidationNegativeOffsetDisabled { + err = errors.New("negative offset is disabled, use --enable-feature=promql-negative-offset to enable it") + } if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } - - selectors := parser.ExtractSelectors(expr) - if len(selectors) < 1 { - return apiFuncResult{nil, nil, nil, nil} - } - - if len(selectors) > 1 { - return apiFuncResult{nil, &apiError{errorBadData, errors.New("need exactly 1 selector")}, nil, nil} - } - - hasRate, rateDuration := false, time.Duration(0) - parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { - switch n := node.(type) { - case *parser.Call: - if n.Func.Name == "rate" { - hasRate = true - rateDuration = n.Args[0].(*parser.MatrixSelector).Range - return errors.New("stop it here") - } + // From now on, we must only return with a finalizer in the result (to + // be called by the caller) or call qry.Close ourselves (which is + // required in the case of a panic). + defer func() { + if result.finalizer == nil { + qry.Close() } - return nil - }) - var numRateSamples int - if hasRate { - numRateSamples = int(end.Sub(start)/step + 1) - if start.Add(time.Duration(numRateSamples-1) * step).After(end) { - numRateSamples-- - } - start = start.Add(-rateDuration) // Adjusting for the first point lookback. + }() + + ctx = httputil.ContextFromRequest(ctx, r) + + res := qry.Exec(ctx) + if res.Err != nil { + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } - q, err := api.Queryable.Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) - if err != nil { - return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} + // Optional stats field in response if parameter "stats" is not empty. + var qs *stats.QueryStats + if r.FormValue("stats") != "" { + qs = stats.NewQueryStats(qry.Stats()) } - res := promql.Matrix{} - ss := q.Select(true, nil, selectors[0]...) - - for ss.Next() { - resSeries := make(map[float64]promql.Series) // le -> series. - - s := ss.At() - it := s.Iterator() - for it.Next() { // Per histogram. - t, h := it.AtHistogram() - buckets := histogram.CumulativeExpandSparseHistogram(h) - for buckets.Next() { - // Every bucket is a different series with different "le". - b := buckets.At() - rs, ok := resSeries[b.Le] - if !ok { - rs = promql.Series{ - Metric: append( - s.Labels(), - labels.Label{Name: "le", Value: fmt.Sprintf("%.16f", b.Le)}, // TODO: Set some precision for 'le'? - ), - } - sort.Sort(rs.Metric) - resSeries[b.Le] = rs - } - - rs.Points = append(rs.Points, promql.Point{ - T: t, - V: float64(b.Count), - }) - resSeries[b.Le] = rs - } - if buckets.Err() != nil { - return apiFuncResult{nil, &apiError{errorExec, buckets.Err()}, nil, nil} - } - } - - for _, rs := range resSeries { - res = append(res, rs) - } - } - - if hasRate { - newRes := make(promql.Matrix, len(res)) - for i := range newRes { - newRes[i].Metric = res[i].Metric - points := make([]promql.Point, numRateSamples) - - rawPoints := res[i].Points - - startIdx, endIdx := 0, 0 - for idx := range points { - pointTime := start.Add(time.Duration(idx) * step) - lookbackTime := pointTime.Add(-rateDuration) - points[idx].T = timestamp.FromTime(pointTime) - if len(rawPoints) == 0 { - continue - } - - for startIdx < len(rawPoints) && timestamp.Time(rawPoints[startIdx].T).Before(lookbackTime) { - startIdx++ - } - if startIdx >= len(rawPoints) { - startIdx = len(rawPoints) - 1 - } - - for endIdx < len(rawPoints) && timestamp.Time(rawPoints[endIdx].T).Before(pointTime) { - endIdx++ - } - if endIdx >= len(rawPoints) { - endIdx = len(rawPoints) - 1 - } else if timestamp.Time(rawPoints[endIdx].T).After(pointTime) && (len(rawPoints) == 1 || endIdx == 0) { - continue - } else { - endIdx-- - } - - valDiff := rawPoints[endIdx].V - rawPoints[startIdx].V - timeDiffSeconds := float64(timestamp.Time(rawPoints[endIdx].T).Sub(timestamp.Time(rawPoints[startIdx].T))) / float64(time.Second) - - if timeDiffSeconds != 0 { - points[idx].V = valDiff / timeDiffSeconds - } - } - - newRes[i].Points = points - } - - res = newRes - } - - sort.Sort(res) - return apiFuncResult{&queryData{ - ResultType: res.Type(), - Result: res, - }, nil, nil, nil} + ResultType: res.Value.Type(), + Result: res.Value, + Stats: qs, + }, nil, res.Warnings, qry.Close} } +// TODO: remove this when we have sparse histogram support in PromQL. +// This is a hack to query sparse histogram for buckets. +//func (api *API) queryRange(r *http.Request) (result apiFuncResult) { +// start, err := parseTime(r.FormValue("start")) +// if err != nil { +// return invalidParamError(err, "start") +// } +// end, err := parseTime(r.FormValue("end")) +// if err != nil { +// return invalidParamError(err, "end") +// } +// if end.Before(start) { +// return invalidParamError(errors.New("end timestamp must not be before start time"), "end") +// } +// +// step, err := parseDuration(r.FormValue("step")) +// if err != nil { +// return invalidParamError(err, "step") +// } +// +// if step <= 0 { +// return invalidParamError(errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer"), "step") +// } +// +// // For safety, limit the number of returned points per timeseries. +// // This is sufficient for 60s resolution for a week or 1h resolution for a year. +// if end.Sub(start)/step > 11000 { +// err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") +// return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} +// } +// +// ctx := r.Context() +// if to := r.FormValue("timeout"); to != "" { +// var cancel context.CancelFunc +// timeout, err := parseDuration(to) +// if err != nil { +// return invalidParamError(err, "timeout") +// } +// +// ctx, cancel = context.WithTimeout(ctx, timeout) +// defer cancel() +// } +// +// expr, err := parser.ParseExpr(r.FormValue("query")) +// if err != nil { +// return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} +// } +// +// selectors := parser.ExtractSelectors(expr) +// if len(selectors) < 1 { +// return apiFuncResult{nil, nil, nil, nil} +// } +// +// if len(selectors) > 1 { +// return apiFuncResult{nil, &apiError{errorBadData, errors.New("need exactly 1 selector")}, nil, nil} +// } +// +// hasRate, rateDuration := false, time.Duration(0) +// parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { +// switch n := node.(type) { +// case *parser.Call: +// if n.Func.Name == "rate" { +// hasRate = true +// rateDuration = n.Args[0].(*parser.MatrixSelector).Range +// return errors.New("stop it here") +// } +// } +// return nil +// }) +// var numRateSamples int +// if hasRate { +// numRateSamples = int(end.Sub(start)/step + 1) +// if start.Add(time.Duration(numRateSamples-1) * step).After(end) { +// numRateSamples-- +// } +// start = start.Add(-rateDuration) // Adjusting for the first point lookback. +// } +// +// q, err := api.Queryable.Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) +// if err != nil { +// return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} +// } +// +// res := promql.Matrix{} +// ss := q.Select(true, nil, selectors[0]...) +// +// for ss.Next() { +// resSeries := make(map[float64]promql.Series) // le -> series. +// +// s := ss.At() +// it := s.Iterator() +// for it.Next() { // Per histogram. +// t, h := it.AtHistogram() +// buckets := histogram.CumulativeExpandSparseHistogram(h) +// for buckets.Next() { +// // Every bucket is a different series with different "le". +// b := buckets.At() +// rs, ok := resSeries[b.Le] +// if !ok { +// rs = promql.Series{ +// Metric: append( +// s.Labels(), +// labels.Label{Name: "le", Value: fmt.Sprintf("%.16f", b.Le)}, // TODO: Set some precision for 'le'? +// ), +// } +// sort.Sort(rs.Metric) +// resSeries[b.Le] = rs +// } +// +// rs.Points = append(rs.Points, promql.Point{ +// T: t, +// V: float64(b.Count), +// }) +// resSeries[b.Le] = rs +// } +// if buckets.Err() != nil { +// return apiFuncResult{nil, &apiError{errorExec, buckets.Err()}, nil, nil} +// } +// } +// +// for _, rs := range resSeries { +// res = append(res, rs) +// } +// } +// +// if hasRate { +// newRes := make(promql.Matrix, len(res)) +// for i := range newRes { +// newRes[i].Metric = res[i].Metric +// points := make([]promql.Point, numRateSamples) +// +// rawPoints := res[i].Points +// +// startIdx, endIdx := 0, 0 +// for idx := range points { +// pointTime := start.Add(time.Duration(idx) * step) +// lookbackTime := pointTime.Add(-rateDuration) +// points[idx].T = timestamp.FromTime(pointTime) +// if len(rawPoints) == 0 { +// continue +// } +// +// for startIdx < len(rawPoints) && timestamp.Time(rawPoints[startIdx].T).Before(lookbackTime) { +// startIdx++ +// } +// if startIdx >= len(rawPoints) { +// startIdx = len(rawPoints) - 1 +// } +// +// for endIdx < len(rawPoints) && timestamp.Time(rawPoints[endIdx].T).Before(pointTime) { +// endIdx++ +// } +// if endIdx >= len(rawPoints) { +// endIdx = len(rawPoints) - 1 +// } else if timestamp.Time(rawPoints[endIdx].T).After(pointTime) && (len(rawPoints) == 1 || endIdx == 0) { +// continue +// } else { +// endIdx-- +// } +// +// valDiff := rawPoints[endIdx].V - rawPoints[startIdx].V +// timeDiffSeconds := float64(timestamp.Time(rawPoints[endIdx].T).Sub(timestamp.Time(rawPoints[startIdx].T))) / float64(time.Second) +// +// if timeDiffSeconds != 0 { +// points[idx].V = valDiff / timeDiffSeconds +// } +// } +// +// newRes[i].Points = points +// } +// +// res = newRes +// } +// +// sort.Sort(res) +// +// return apiFuncResult{&queryData{ +// ResultType: res.Type(), +// Result: res, +// }, nil, nil, nil} +//} + func (api *API) queryExemplars(r *http.Request) apiFuncResult { start, err := parseTimeParam(r, "start", minTime) if err != nil {