diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index a5e4f806f..3b87f0db8 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -44,6 +44,7 @@ import ( "github.com/prometheus/prometheus/discovery/file" "github.com/prometheus/prometheus/discovery/kubernetes" "github.com/prometheus/prometheus/pkg/rulefmt" + "github.com/prometheus/prometheus/tsdb" _ "github.com/prometheus/prometheus/discovery/install" // Register service discovery implementations. ) @@ -140,7 +141,7 @@ func main() { createBlocksFromRulesStart := createBlocksFromRulesCmd.Flag("start", "The time to start backfilling the new rule from. It is required. Start time should be RFC3339 or Unix timestamp."). Required().String() createBlocksFromRulesEnd := createBlocksFromRulesCmd.Flag("end", "If an end time is provided, all recording rules in the rule files provided will be backfilled to the end time. Default will backfill up to 3 hrs ago. End time should be RFC3339 or Unix timestamp.").String() - createBlocksFromRulesOutputDir := createBlocksFromRulesCmd.Flag("output dir", "The filepath on the local filesystem to write the output to. Output will be blocks containing the data of the backfilled recording rules.").Default("backfilldata/").String() + createBlocksFromRulesOutputDir := createBlocksFromRulesCmd.Flag("output dir", "The filepath on the local filesystem to write the output to. Output will be blocks containing the data of the backfilled recording rules. Don't use an active Prometheus data directory. If command is run many times with same start/end time, it will create duplicate series.").Default("backfilldata/").String() createBlocksFromRulesURL := createBlocksFromRulesCmd.Flag("url", "Prometheus API url with the data where the rule will be backfilled from.").Default("http://localhost:9090").String() createBlocksFromRulesEvalInterval := createBlocksFromRulesCmd.Flag("evaluation-interval-default", "How frequently to evaluate rules when backfilling if a value is not set in the rules file."). Default("60s").Duration() @@ -206,7 +207,7 @@ func main() { os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime))) case createBlocksFromRulesCmd.FullCommand(): - os.Exit(checkErr(BackfillRule(*createBlocksFromRulesURL, *createBlocksFromRulesStart, *createBlocksFromRulesEnd, *createBlocksFromRulesOutputDir, *createBlocksFromRulesEvalInterval, *createBlocksFromRulesFiles...))) + os.Exit(checkErr(CreateBlocksFromRules(*createBlocksFromRulesURL, *createBlocksFromRulesStart, *createBlocksFromRulesEnd, *createBlocksFromRulesOutputDir, *createBlocksFromRulesEvalInterval, *createBlocksFromRulesFiles...))) } } @@ -782,9 +783,9 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) { json.NewEncoder(os.Stdout).Encode(v) } -// BackfillRule backfills recording rules from the files provided. The output are blocks of data +// CreateBlocksFromRules backfills recording rules from the files provided. The output are blocks of data // at the outputDir location. -func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration, files ...string) error { +func CreateBlocksFromRules(url, start, end, outputDir string, evalInterval time.Duration, files ...string) error { ctx := context.Background() var stime, etime time.Time var err error @@ -809,19 +810,29 @@ func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration, return nil } + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + writer, err := tsdb.NewBlockWriter(logger, + outputDir, + tsdb.DefaultBlockDuration, + ) + if err != nil { + fmt.Fprintln(os.Stderr, "new writer error", err) + return err + } + cfg := ruleImporterConfig{ Start: stime, End: etime, - OutputDir: outputDir, EvalInterval: evalInterval, - URL: url, } - logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - ruleImporter := newRuleImporter(logger, cfg) - if err = ruleImporter.init(); err != nil { - fmt.Fprintln(os.Stderr, "rule importer init error", err) + c, err := api.NewClient(api.Config{ + Address: url, + }) + if err != nil { + fmt.Fprintln(os.Stderr, "new api client error", err) return err } + ruleImporter := newRuleImporter(logger, cfg, v1.NewAPI(c), newMultipleAppender(ctx, maxSamplesInMemory, writer)) errs := ruleImporter.loadGroups(ctx, files) for _, err := range errs { diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index 4533e6ebb..359dcc0aa 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -22,7 +22,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -31,160 +30,88 @@ import ( "github.com/prometheus/prometheus/tsdb" ) +const maxSamplesInMemory = 5000 + +type queryRangeAPI interface { + QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, v1.Warnings, error) +} + // ruleImporter is the importer to backfill rules. type ruleImporter struct { logger log.Logger config ruleImporterConfig + apiClient queryRangeAPI + + appender *multipleAppender + groups map[string]*rules.Group - groupLoader rules.GroupLoader - - apiClient v1.API - - writer *tsdb.BlockWriter + ruleManager *rules.Manager } // ruleImporterConfig is the config for the rule importer. type ruleImporterConfig struct { Start time.Time End time.Time - OutputDir string EvalInterval time.Duration - URL string } // newRuleImporter creates a new rule importer that can be used to backfill rules. -func newRuleImporter(logger log.Logger, config ruleImporterConfig) *ruleImporter { +func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI, appender *multipleAppender) *ruleImporter { return &ruleImporter{ logger: logger, config: config, - groupLoader: rules.FileLoader{}, + apiClient: apiClient, + appender: appender, + ruleManager: rules.NewManager(&rules.ManagerOptions{}), } } -// init initializes the rule importer which creates a new block writer -// and creates an Prometheus API client. -func (importer *ruleImporter) init() error { - w, err := tsdb.NewBlockWriter(importer.logger, - importer.config.OutputDir, - tsdb.DefaultBlockDuration, - ) - if err != nil { - return err - } - importer.writer = w - - config := api.Config{ - Address: importer.config.URL, - } - c, err := api.NewClient(config) - if err != nil { - return err - } - importer.apiClient = v1.NewAPI(c) - return nil -} - -// close cleans up any open resources. -func (importer *ruleImporter) close() error { - return importer.writer.Close() -} - -// loadGroups reads groups from a list of rule files. +// loadGroups parses groups from a list of rule files. func (importer *ruleImporter) loadGroups(ctx context.Context, filenames []string) (errs []error) { - groups := make(map[string]*rules.Group) - - for _, filename := range filenames { - rgs, errs := importer.groupLoader.Load(filename) - if errs != nil { - return errs - } - - for _, ruleGroup := range rgs.Groups { - itv := importer.config.EvalInterval - if ruleGroup.Interval != 0 { - itv = time.Duration(ruleGroup.Interval) - } - - rgRules := make([]rules.Rule, 0, len(ruleGroup.Rules)) - for _, r := range ruleGroup.Rules { - expr, err := importer.groupLoader.Parse(r.Expr.Value) - if err != nil { - return []error{errors.Wrap(err, filename)} - } - rgRules = append(rgRules, rules.NewRecordingRule( - r.Record.Value, - expr, - labels.FromMap(r.Labels), - )) - } - - groups[rules.GroupKey(filename, ruleGroup.Name)] = rules.NewGroup(rules.GroupOptions{ - Name: ruleGroup.Name, - File: filename, - Interval: itv, - Rules: rgRules, - Opts: &rules.ManagerOptions{}, - }) - } + groups, errs := importer.ruleManager.LoadGroups(importer.config.EvalInterval, labels.Labels{}, filenames...) + if errs != nil { + return errs } - importer.groups = groups return nil } // importAll evaluates all the groups and rules and creates new time series // and stores them in new blocks. -func (importer *ruleImporter) importAll(ctx context.Context) []error { - var errs = []error{} - var currentBlockEnd time.Time - var appender storage.Appender - +func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) { for name, group := range importer.groups { level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing group, name: %s", name)) stimeWithAlignment := group.EvalTimestamp(importer.config.Start.UnixNano()) - ts := stimeWithAlignment - // a 2-hr block that contains all the data for each rule - for ts.Before(importer.config.End) { - currentBlockEnd = ts.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond) + for stimeWithAlignment.Before(importer.config.End) { + + currentBlockEnd := stimeWithAlignment.Add(time.Duration(tsdb.DefaultBlockDuration) * time.Millisecond) if currentBlockEnd.After(importer.config.End) { currentBlockEnd = importer.config.End } - // should we be creating a new appender for each block? - appender = importer.writer.Appender(ctx) for i, r := range group.Rules() { level.Info(importer.logger).Log("backfiller", fmt.Sprintf("processing rule %d, name: %s", i+1, r.Name())) - err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), ts, currentBlockEnd, appender) - if err != nil { + if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), stimeWithAlignment, currentBlockEnd); err != nil { errs = append(errs, err) } } - ts = currentBlockEnd - _, err := importer.writer.Flush(ctx) - if err != nil { - errs = append(errs, err) - } - - err = appender.Commit() - if err != nil { - errs = append(errs, err) - } + stimeWithAlignment = currentBlockEnd } } return errs } // importRule imports the historical data for a single rule. -func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time, appender storage.Appender) error { +func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time) error { val, warnings, err := importer.apiClient.QueryRange(ctx, ruleExpr, v1.Range{ Start: start, End: end, - Step: importer.config.EvalInterval, + Step: importer.config.EvalInterval, // todo: did we check if the rule has an interval? }, ) if err != nil { @@ -200,7 +127,7 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName matrix = val.(model.Matrix) for _, sample := range matrix { - currentLabels := make(labels.Labels, 0, len(sample.Metric)) + currentLabels := make(labels.Labels, 0, len(sample.Metric)+len(ruleLabels)+1) currentLabels = append(currentLabels, labels.Label{ Name: labels.MetricName, Value: ruleName, @@ -215,14 +142,58 @@ func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName }) } for _, value := range sample.Values { - _, err := appender.Add(currentLabels, value.Timestamp.Unix(), float64(value.Value)) - if err != nil { + if err := importer.appender.add(ctx, currentLabels, value.Timestamp.Unix(), float64(value.Value)); err != nil { return err } } } default: - return errors.New("rule result is wrong type") + return errors.New(fmt.Sprintf("rule result is wrong type %s", val.Type().String())) } return nil } + +// multipleAppender keeps track of how many series have been added to the current appender. +// If the max samples have been added, then all series are flushed to disk and commited and a new +// appender is created. +type multipleAppender struct { + maxSamplesInMemory int + currentSampleCount int + writer *tsdb.BlockWriter + appender storage.Appender +} + +func newMultipleAppender(ctx context.Context, maxSamplesInMemory int, blockWriter *tsdb.BlockWriter) *multipleAppender { + return &multipleAppender{ + maxSamplesInMemory: maxSamplesInMemory, + writer: blockWriter, + appender: blockWriter.Appender(ctx), + } +} + +func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error { + if _, err := m.appender.Add(l, t, v); err != nil { + return err + } + m.currentSampleCount++ + if m.currentSampleCount > m.maxSamplesInMemory { + return m.flushAndCommit(ctx) + } + return nil +} + +func (m *multipleAppender) flushAndCommit(ctx context.Context) error { + if _, err := m.writer.Flush(ctx); err != nil { + return err + } + if err := m.appender.Commit(); err != nil { + return err + } + m.appender = m.writer.Appender(ctx) + m.currentSampleCount = 0 + return nil +} + +func (m *multipleAppender) close() error { + return m.writer.Close() +}