prometheus/cmd/promtool/rules.go

218 lines
7.0 KiB
Go
Raw Normal View History

// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
)
const maxSamplesInMemory = 5000
type queryRangeAPI interface {
QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error)
}
type ruleImporter struct {
logger log.Logger
config ruleImporterConfig
apiClient queryRangeAPI
groups map[string]*rules.Group
ruleManager *rules.Manager
}
type ruleImporterConfig struct {
outputDir string
start time.Time
end time.Time
evalInterval time.Duration
}
// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series
// written to disk in blocks.
func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter {
return &ruleImporter{
logger: logger,
config: config,
apiClient: apiClient,
ruleManager: rules.NewManager(&rules.ManagerOptions{}),
}
}
// loadGroups parses groups from a list of recording rule files.
func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) {
groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, filenames...)
if errs != nil {
return errs
}
importer.groups = groups
return nil
}
// importAll evaluates all the recording rules and creates new time series and writes them to disk in blocks.
func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
for name, group := range importer.groups {
level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing group, name: %s", name))
stimeWithAlignment := group.EvalTimestamp(importer.config.start.UnixNano())
for i, r := range group.Rules() {
level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name()))
if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, importer.config.end, group.Interval()); err != nil {
errs = append(errs, err)
}
}
}
return errs
}
// importRule queries a prometheus API to evaluate rules at times in the past.
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, interval time.Duration) (err error) {
blockDuration := tsdb.DefaultBlockDuration
startOfBlock := blockDuration * (start.Unix() / blockDuration)
for t := startOfBlock; t <= end.Unix(); t = t + blockDuration {
endOfBlock := t + blockDuration
val, warnings, err := importer.apiClient.QueryRange(ctx,
ruleExpr,
v1.Range{
Start: time.Unix(t, 0),
End: time.Unix(endOfBlock, 0),
Step: interval,
},
)
if err != nil {
return errors.Wrap(err, "query range")
}
if warnings != nil {
level.Warn(importer.logger).Log("backfiller", fmt.Sprintf("warnings QueryRange api: %v", warnings))
}
// To prevent races with compaction, a block writer only allows appending samples
// that are at most half a block size older than the most recent sample appended so far.
// However, in the way we use the block writer here, compaction doesn't happen, while we
// also need to append samples throughout the whole block range. To allow that, we
// pretend that the block is twice as large here, but only really add sample in the
// original interval later.
w, err := tsdb.NewBlockWriter(log.NewNopLogger(), importer.config.outputDir, 2*tsdb.DefaultBlockDuration)
if err != nil {
return errors.Wrap(err, "new block writer")
}
defer func() {
err = tsdb_errors.NewMulti(err, w.Close()).Err()
}()
app := newMultipleAppender(ctx, w)
var matrix model.Matrix
switch val.Type() {
case model.ValMatrix:
matrix = val.(model.Matrix)
for _, sample := range matrix {
currentLabels := make(labels.Labels, 0, len(sample.Metric)+len(ruleLabels)+1)
currentLabels = append(currentLabels, labels.Label{
Name: labels.MetricName,
Value: ruleName,
})
currentLabels = append(currentLabels, ruleLabels...)
for name, value := range sample.Metric {
currentLabels = append(currentLabels, labels.Label{
Name: string(name),
Value: string(value),
})
}
for _, value := range sample.Values {
if err := app.add(ctx, currentLabels, timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil {
return errors.Wrap(err, "add")
}
}
}
default:
return errors.New(fmt.Sprintf("rule result is wrong type %s", val.Type().String()))
}
if err := app.flushAndCommit(ctx); err != nil {
return errors.Wrap(err, "flush and commit")
}
}
return err
}
func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *multipleAppender {
return &multipleAppender{
maxSamplesInMemory: maxSamplesInMemory,
writer: blockWriter,
appender: blockWriter.Appender(ctx),
}
}
// multipleAppender keeps track of how many series have been added to the current appender.
// If the max samples have been added, then all series are flushed to disk and commited and a new
// appender is created.
type multipleAppender struct {
maxSamplesInMemory int
currentSampleCount int
writer *tsdb.BlockWriter
appender storage.Appender
}
func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error {
if _, err := m.appender.Append(0, l, t, v); err != nil {
return errors.Wrap(err, "multiappender append")
}
m.currentSampleCount++
if m.currentSampleCount >= m.maxSamplesInMemory {
return m.commit(ctx)
}
return nil
}
func (m *multipleAppender) commit(ctx context.Context) error {
if m.currentSampleCount == 0 {
return nil
}
if err := m.appender.Commit(); err != nil {
return errors.Wrap(err, "multiappender commit")
}
m.appender = m.writer.Appender(ctx)
m.currentSampleCount = 0
return nil
}
func (m *multipleAppender) flushAndCommit(ctx context.Context) error {
if err := m.commit(ctx); err != nil {
return err
}
if _, err := m.writer.Flush(ctx); err != nil {
return errors.Wrap(err, "multiappender flush")
}
return nil
}