prometheus/cmd/promtool/rules.go

212 lines
5.4 KiB
Go

// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/tsdb"
)
const blockSize = 2 // in hours
// ruleImporter is the importer to backfill rules.
type ruleImporter struct {
logger log.Logger
config ruleImporterConfig
groups map[string]*rules.Group
groupLoader rules.GroupLoader
apiClient v1.API
writer *tsdb.BlockWriter
}
// ruleImporterConfig is the config for the rule importer.
type ruleImporterConfig struct {
Start time.Time
End time.Time
OutputDir string
EvalInterval time.Duration
URL string
}
// newRuleImporter creates a new rule importer that can be used to backfill rules.
func newRuleImporter(logger log.Logger, config ruleImporterConfig) *ruleImporter {
return &ruleImporter{
config: config,
groupLoader: rules.FileLoader{},
}
}
// init initializes the rule importer which creates a new block writer
// and creates an Prometheus API client.
func (importer *ruleImporter) init() error {
w, err := tsdb.NewBlockWriter(importer.logger,
importer.config.OutputDir,
(blockSize * time.Hour).Milliseconds(),
)
if err != nil {
return err
}
importer.writer = w
config := api.Config{
Address: importer.config.URL,
}
c, err := api.NewClient(config)
if err != nil {
return err
}
importer.apiClient = v1.NewAPI(c)
return nil
}
// close cleans up any open resources.
func (importer *ruleImporter) close() error {
return importer.writer.Close()
}
// loadGroups reads groups from a list of rule files.
func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) {
groups := make(map[string]*rules.Group)
for _, filename := range filenames {
rgs, errs := importer.groupLoader.Load(filename)
if errs != nil {
return errs
}
for _, ruleGroup := range rgs.Groups {
itv := importer.config.EvalInterval
if ruleGroup.Interval != 0 {
itv = time.Duration(ruleGroup.Interval)
}
rgRules := make([]rules.Rule, 0, len(ruleGroup.Rules))
for _, r := range ruleGroup.Rules {
expr, err := importer.groupLoader.Parse(r.Expr.Value)
if err != nil {
return []error{errors.Wrap(err, filename)}
}
rgRules = append(rgRules, rules.NewRecordingRule(
r.Record.Value,
expr,
labels.FromMap(r.Labels),
))
}
groups[rules.GroupKey(filename, ruleGroup.Name)] = rules.NewGroup(rules.GroupOptions{
Name: ruleGroup.Name,
File: filename,
Interval: itv,
Rules: rgRules,
})
}
}
importer.groups = groups
return nil
}
// importAll evaluates all the groups and rules and creates new time series
// and stores them in new blocks.
func (importer *ruleImporter) importAll(ctx context.Context) []error {
var errs = []error{}
for _, group := range importer.groups {
stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano())
for _, r := range group.Rules() {
err := importer.importRule(ctx, r.Query().String(), stimeWithAlignment, group.Interval())
if err != nil {
errs = append(errs, err)
}
}
}
_, err := importer.writer.Flush(ctx)
if err != nil {
errs = append(errs, err)
}
return errs
}
// importRule imports the historical data for a single rule.
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr string, stimeWithAlignment time.Time, internval time.Duration) error {
ts := stimeWithAlignment
appender := importer.writer.Appender(ctx)
for ts.Before(importer.config.End) {
currentBlockEnd := ts.Add(blockSize * time.Hour)
if currentBlockEnd.After(importer.config.End) {
currentBlockEnd = importer.config.End
}
val, warnings, err := importer.apiClient.QueryRange(ctx,
ruleExpr,
v1.Range{
Start: ts,
End: currentBlockEnd,
Step: importer.config.EvalInterval,
},
)
if err != nil {
return err
}
if warnings != nil {
fmt.Fprint(os.Stderr, "warning api.QueryRange:", warnings)
}
var matrix model.Matrix
switch val.Type() {
case model.ValMatrix:
matrix = val.(model.Matrix)
for _, sample := range matrix {
currentLabels := make(labels.Labels, 0, len(sample.Metric))
for k, v := range sample.Metric {
currentLabels = append(currentLabels, labels.Label{
Name: string(k),
Value: string(v),
})
}
for _, value := range sample.Values {
_, err := appender.Add(currentLabels, value.Timestamp.Unix(), float64(value.Value))
if err != nil {
return err
}
}
}
default:
return errors.New("rule result is wrong type")
}
ts = currentBlockEnd
}
_, err := importer.writer.Flush(ctx)
if err != nil {
return err
}
return appender.Commit()
}