251 lines
7.9 KiB
Go
251 lines
7.9 KiB
Go
// Copyright 2020 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
|
"github.com/prometheus/common/model"
|
|
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/rules"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/tsdb"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
)
|
|
|
|
const maxSamplesInMemory = 5000
|
|
|
|
type queryRangeAPI interface {
|
|
QueryRange(ctx context.Context, query string, r v1.Range, opts ...v1.Option) (model.Value, v1.Warnings, error)
|
|
}
|
|
|
|
type ruleImporter struct {
|
|
logger log.Logger
|
|
config ruleImporterConfig
|
|
|
|
apiClient queryRangeAPI
|
|
|
|
groups map[string]*rules.Group
|
|
ruleManager *rules.Manager
|
|
}
|
|
|
|
type ruleImporterConfig struct {
|
|
outputDir string
|
|
start time.Time
|
|
end time.Time
|
|
evalInterval time.Duration
|
|
maxBlockDuration time.Duration
|
|
}
|
|
|
|
// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series
|
|
// written to disk in blocks.
|
|
func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter {
|
|
level.Info(logger).Log("backfiller", "new rule importer", "start", config.start.Format(time.RFC822), "end", config.end.Format(time.RFC822))
|
|
return &ruleImporter{
|
|
logger: logger,
|
|
config: config,
|
|
apiClient: apiClient,
|
|
ruleManager: rules.NewManager(&rules.ManagerOptions{}),
|
|
}
|
|
}
|
|
|
|
// loadGroups parses groups from a list of recording rule files.
|
|
func (importer *ruleImporter) loadGroups(_ context.Context, filenames []string) (errs []error) {
|
|
groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, "", nil, filenames...)
|
|
if errs != nil {
|
|
return errs
|
|
}
|
|
importer.groups = groups
|
|
return nil
|
|
}
|
|
|
|
// importAll evaluates all the recording rules and creates new time series and writes them to disk in blocks.
|
|
func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
|
|
for name, group := range importer.groups {
|
|
level.Info(importer.logger).Log("backfiller", "processing group", "name", name)
|
|
|
|
for i, r := range group.Rules() {
|
|
level.Info(importer.logger).Log("backfiller", "processing rule", "id", i, "name", r.Name())
|
|
if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), importer.config.start, importer.config.end, int64(importer.config.maxBlockDuration/time.Millisecond), group); err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
}
|
|
return errs
|
|
}
|
|
|
|
// importRule queries a prometheus API to evaluate rules at times in the past.
|
|
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time,
|
|
maxBlockDuration int64, grp *rules.Group,
|
|
) (err error) {
|
|
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
|
|
startInMs := start.Unix() * int64(time.Second/time.Millisecond)
|
|
endInMs := end.Unix() * int64(time.Second/time.Millisecond)
|
|
|
|
for startOfBlock := blockDuration * (startInMs / blockDuration); startOfBlock <= endInMs; startOfBlock += blockDuration {
|
|
endOfBlock := startOfBlock + blockDuration - 1
|
|
|
|
currStart := max(startOfBlock/int64(time.Second/time.Millisecond), start.Unix())
|
|
startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UTC().UnixNano())
|
|
for startWithAlignment.Unix() < currStart {
|
|
startWithAlignment = startWithAlignment.Add(grp.Interval())
|
|
}
|
|
end := time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0).UTC()
|
|
if end.Before(startWithAlignment) {
|
|
break
|
|
}
|
|
val, warnings, err := importer.apiClient.QueryRange(ctx,
|
|
ruleExpr,
|
|
v1.Range{
|
|
Start: startWithAlignment,
|
|
End: end,
|
|
Step: grp.Interval(),
|
|
},
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("query range: %w", err)
|
|
}
|
|
if warnings != nil {
|
|
level.Warn(importer.logger).Log("msg", "Range query returned warnings.", "warnings", warnings)
|
|
}
|
|
|
|
// To prevent races with compaction, a block writer only allows appending samples
|
|
// that are at most half a block size older than the most recent sample appended so far.
|
|
// However, in the way we use the block writer here, compaction doesn't happen, while we
|
|
// also need to append samples throughout the whole block range. To allow that, we
|
|
// pretend that the block is twice as large here, but only really add sample in the
|
|
// original interval later.
|
|
w, err := tsdb.NewBlockWriter(log.NewNopLogger(), importer.config.outputDir, 2*blockDuration)
|
|
if err != nil {
|
|
return fmt.Errorf("new block writer: %w", err)
|
|
}
|
|
var closed bool
|
|
defer func() {
|
|
if !closed {
|
|
err = tsdb_errors.NewMulti(err, w.Close()).Err()
|
|
}
|
|
}()
|
|
app := newMultipleAppender(ctx, w)
|
|
var matrix model.Matrix
|
|
switch val.Type() {
|
|
case model.ValMatrix:
|
|
matrix = val.(model.Matrix)
|
|
|
|
for _, sample := range matrix {
|
|
lb := labels.NewBuilder(labels.Labels{})
|
|
|
|
for name, value := range sample.Metric {
|
|
lb.Set(string(name), string(value))
|
|
}
|
|
|
|
// Setting the rule labels after the output of the query,
|
|
// so they can override query output.
|
|
ruleLabels.Range(func(l labels.Label) {
|
|
lb.Set(l.Name, l.Value)
|
|
})
|
|
|
|
lb.Set(labels.MetricName, ruleName)
|
|
lbls := lb.Labels()
|
|
|
|
for _, value := range sample.Values {
|
|
if err := app.add(ctx, lbls, timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil {
|
|
return fmt.Errorf("add: %w", err)
|
|
}
|
|
}
|
|
}
|
|
default:
|
|
return fmt.Errorf("rule result is wrong type %s", val.Type().String())
|
|
}
|
|
|
|
if err := app.flushAndCommit(ctx); err != nil {
|
|
return fmt.Errorf("flush and commit: %w", err)
|
|
}
|
|
err = tsdb_errors.NewMulti(err, w.Close()).Err()
|
|
closed = true
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *multipleAppender {
|
|
return &multipleAppender{
|
|
maxSamplesInMemory: maxSamplesInMemory,
|
|
writer: blockWriter,
|
|
appender: blockWriter.Appender(ctx),
|
|
}
|
|
}
|
|
|
|
// multipleAppender keeps track of how many series have been added to the current appender.
|
|
// If the max samples have been added, then all series are committed and a new appender is created.
|
|
type multipleAppender struct {
|
|
maxSamplesInMemory int
|
|
currentSampleCount int
|
|
writer *tsdb.BlockWriter
|
|
appender storage.Appender
|
|
}
|
|
|
|
func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error {
|
|
if _, err := m.appender.Append(0, l, t, v); err != nil {
|
|
return fmt.Errorf("multiappender append: %w", err)
|
|
}
|
|
m.currentSampleCount++
|
|
if m.currentSampleCount >= m.maxSamplesInMemory {
|
|
return m.commit(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *multipleAppender) commit(ctx context.Context) error {
|
|
if m.currentSampleCount == 0 {
|
|
return nil
|
|
}
|
|
if err := m.appender.Commit(); err != nil {
|
|
return fmt.Errorf("multiappender commit: %w", err)
|
|
}
|
|
m.appender = m.writer.Appender(ctx)
|
|
m.currentSampleCount = 0
|
|
return nil
|
|
}
|
|
|
|
func (m *multipleAppender) flushAndCommit(ctx context.Context) error {
|
|
if err := m.commit(ctx); err != nil {
|
|
return err
|
|
}
|
|
if _, err := m.writer.Flush(ctx); err != nil {
|
|
return fmt.Errorf("multiappender flush: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func max(x, y int64) int64 {
|
|
if x > y {
|
|
return x
|
|
}
|
|
return y
|
|
}
|
|
|
|
func min(x, y int64) int64 {
|
|
if x < y {
|
|
return x
|
|
}
|
|
return y
|
|
}
|