From 7504b5ce7ccb5f38413a5342e6c1ec01da5ef99b Mon Sep 17 00:00:00 2001 From: jessicagreben Date: Mon, 27 Jul 2020 07:44:49 -0700 Subject: [PATCH] add rule importer with tsdb block writer Signed-off-by: jessicagreben --- cmd/promtool/main.go | 54 +++++++ importers/rules.go | 256 +++++++++++++++++++++++++++++++++ tsdb/importer/blocks/multi.go | 120 ++++++++++++++++ tsdb/importer/blocks/writer.go | 139 ++++++++++++++++++ 4 files changed, 569 insertions(+) create mode 100644 importers/rules.go create mode 100644 tsdb/importer/blocks/multi.go create mode 100644 tsdb/importer/blocks/writer.go diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index f99f63cdd..edddaf8b0 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -28,6 +28,7 @@ import ( "strings" "time" + "github.com/go-kit/kit/log" "github.com/google/pprof/profile" "github.com/pkg/errors" "github.com/prometheus/client_golang/api" @@ -40,6 +41,7 @@ import ( "gopkg.in/alecthomas/kingpin.v2" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/importers" "github.com/prometheus/prometheus/pkg/rulefmt" ) @@ -128,6 +130,20 @@ func main() { dumpMinTime := tsdbDumpCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() + backfillCmd := app.Command("backfill", "Backfill Prometheus data.") + backfillRuleCmd := backfillCmd.Command("rules", "Backfill Prometheus data for new rules.") + backfillRuleURL := backfillRuleCmd.Flag("url", "Prometheus API url.").Required().String() + backfillRuleEvalInterval := backfillRuleCmd.Flag("evaluation_interval", "How frequently to evaluate rules when backfilling."). + Default("-3h").Duration() + backfillRuleStart := backfillRuleCmd.Flag("start", "The time to start backfilling the new rule from. It is required. Start time should be RFC3339 or Unix timestamp."). + Required().Duration() + backfillRuleEnd := backfillRuleCmd.Flag("end", "If an end time is provided, the new rule backfilling will end at this time. The default will backfill to the 3 hrs ago. End time should be RFC3339 or Unix timestamp."). + Default("").Duration() + backfillRuleFiles := backfillRuleCmd.Arg( + "rule-files", + "The file containing the new rule that needs to be backfilled.", + ).Required().ExistingFiles() + parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:])) var p printer @@ -183,6 +199,9 @@ func main() { case tsdbDumpCmd.FullCommand(): os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime))) + + case backfillRuleCmd.FullCommand(): + os.Exit(BackfillRule(*backfillRuleURL, *backfillRuleStart, *backfillRuleEnd, *backfillRuleEvalInterval, *backfillRuleFiles...)) } } @@ -747,3 +766,38 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) { //nolint:errcheck json.NewEncoder(os.Stdout).Encode(v) } + +// BackfillRule backfills rules from the files provided +func BackfillRule(url string, start, end, evalInterval time.Duration, files ...string) int { + ctx := context.Background() + cfg := importers.RuleConfig{ + Start: start.String(), + End: end.String(), + EvalInterval: evalInterval, + URL: url, + } + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + ruleImporter := importers.NewRuleImporter(logger, cfg) + err := ruleImporter.Init() + if err != nil { + fmt.Fprintln(os.Stderr, "rule importer init error", err) + return 1 + } + + errs := ruleImporter.Parse(ctx, files) + for _, err := range errs { + if err != nil { + fmt.Fprintln(os.Stderr, "rule importer parse error", err) + return 1 + } + } + + errs = ruleImporter.ImportAll(ctx) + for _, err := range errs { + if err != nil { + fmt.Fprintln(os.Stderr, "rule importer error", err) + } + } + + return 0 +} diff --git a/importers/rules.go b/importers/rules.go new file mode 100644 index 000000000..a103eadb1 --- /dev/null +++ b/importers/rules.go @@ -0,0 +1,256 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importers + +import ( + "context" + "fmt" + "io/ioutil" + "math" + "net/url" + "os" + "sort" + "strconv" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + plabels "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/rulefmt" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/tsdb/importer/blocks" +) + +// RuleImporter is the importer for rules +type RuleImporter struct { + logger log.Logger + + config RuleConfig + groups map[string]*rules.Group + + apiClient v1.API + + writer *blocks.MultiWriter +} + +// RuleConfig is the config for the rule importer +type RuleConfig struct { + Start string + End string + EvalInterval time.Duration + URL string +} + +// NewRuleImporter creates a new rule importer +func NewRuleImporter(logger log.Logger, config RuleConfig) *RuleImporter { + return &RuleImporter{ + config: config, + } +} + +// Init initializes the rule importer which creates a new block writer +// and creates an Prometheus API client +func (importer *RuleImporter) Init() error { + // create new block writer + newBlockDir, err := ioutil.TempDir("", "rule_blocks") + if err != nil { + return err + } + importer.writer = blocks.NewMultiWriter(importer.logger, newBlockDir, importer.config.EvalInterval.Nanoseconds()) + + // create api client + config := api.Config{ + Address: importer.config.URL, + } + c, err := api.NewClient(config) + if err != nil { + return err + } + importer.apiClient = v1.NewAPI(c) + return nil +} + +// Close cleans up any open resources +func (importer *RuleImporter) Close() error { + return importer.writer.Close() +} + +// Parse parses the groups and rules from a list of rules files +func (importer *RuleImporter) Parse(ctx context.Context, files []string) (errs []error) { + groups := make(map[string]*rules.Group) + + for _, file := range files { + ruleGroups, errs := rulefmt.ParseFile(file) + if errs != nil { + return errs + } + + for _, ruleGroup := range ruleGroups.Groups { + itv := importer.config.EvalInterval + if ruleGroup.Interval != 0 { + itv = time.Duration(ruleGroup.Interval) + } + + rulez := make([]rules.Rule, 0, len(ruleGroup.Rules)) + for _, r := range ruleGroup.Rules { + expr, err := parser.ParseExpr(r.Expr.Value) + if err != nil { + return []error{errors.Wrap(err, file)} + } + + rulez = append(rulez, rules.NewRecordingRule( + r.Record.Value, + expr, + labels.FromMap(r.Labels), + )) + } + + groups[file+";"+ruleGroup.Name] = rules.NewGroup(rules.GroupOptions{ + Name: ruleGroup.Name, + File: file, + Interval: itv, + Rules: rulez, + }) + } + } + + importer.groups = groups + return errs +} + +// ImportAll evaluates all the groups and rules and creates new time series +// and stores in new blocks +func (importer *RuleImporter) ImportAll(ctx context.Context) []error { + var errs = []error{} + for _, group := range importer.groups { + for _, rule := range group.Rules() { + err := importer.ImportRule(ctx, rule) + if err != nil { + errs = append(errs, err) + } + } + } + err := importer.CreateBlocks() + if err != nil { + errs = append(errs, err) + } + return errs +} + +func (importer *RuleImporter) queryFn(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + val, warnings, err := importer.apiClient.Query(ctx, q, t) + if err != nil { + return promql.Vector{}, err + } + if warnings != nil { + fmt.Fprint(os.Stderr, "warning api.Query:", warnings) + } + + switch val.Type() { + case model.ValVector: + valVector := val.(model.Vector) + return modelToPromqlVector(valVector), nil + case model.ValScalar: + valScalar := val.(*model.Scalar) + return promql.Vector{promql.Sample{ + Metric: labels.Labels{}, + Point: promql.Point{T: int64(valScalar.Timestamp), V: float64(valScalar.Value)}, + }}, nil + default: + return nil, errors.New("rule result is wrong type") + } +} + +func modelToPromqlVector(modelValue model.Vector) promql.Vector { + result := make(promql.Vector, 0, len(modelValue)) + + for _, value := range modelValue { + labels := make(labels.Labels, 0, len(value.Metric)) + + for k, v := range value.Metric { + labels = append(labels, plabels.Label{ + Name: string(k), + Value: string(v), + }) + } + sort.Sort(labels) + + result = append(result, promql.Sample{ + Metric: labels, + Point: promql.Point{T: int64(value.Timestamp), V: float64(value.Value)}, + }) + } + return result +} + +// ImportRule imports the historical data for a single rule +func (importer *RuleImporter) ImportRule(ctx context.Context, rule rules.Rule) error { + ts, err := parseTime(importer.config.Start) + if err != nil { + return err + } + end, err := parseTime(importer.config.End) + if err != nil { + return err + } + url, err := url.Parse(importer.config.URL) + if err != nil { + return err + } + + appender := importer.writer.Appender() + for ts.Before(end) { + vector, err := rule.Eval(ctx, ts, importer.queryFn, url) + if err != nil { + return err + } + for _, sample := range vector { + // we don't AddFast here because we need to maintain the + // ref for each series bcs rule.Eval could return different labels, + // so that means you would need to map the ref to metric, but that is what Add does + // anyways so just use that + _, err := appender.Add(plabels.Labels{plabels.Label{Name: sample.String()}}, sample.T, sample.V) + if err != nil { + return err + } + } + + ts.Add(importer.config.EvalInterval) + // todo: 2 hr blocks? + } + return appender.Commit() +} + +func parseTime(s string) (time.Time, error) { + if t, err := strconv.ParseFloat(s, 64); err == nil { + s, ns := math.Modf(t) + return time.Unix(int64(s), int64(ns*float64(time.Second))).UTC(), nil + } + if t, err := time.Parse(time.RFC3339Nano, s); err == nil { + return t, nil + } + return time.Time{}, errors.Errorf("cannot parse %q to a valid timestamp", s) +} + +// CreateBlocks creates blocks for all the rule data +func (importer *RuleImporter) CreateBlocks() error { + _, err := importer.writer.Flush() + return err +} diff --git a/tsdb/importer/blocks/multi.go b/tsdb/importer/blocks/multi.go new file mode 100644 index 000000000..4007860d8 --- /dev/null +++ b/tsdb/importer/blocks/multi.go @@ -0,0 +1,120 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blocks + +import ( + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/index" +) + +type errAppender struct{ err error } + +func (a errAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { return 0, a.err } +func (a errAppender) AddFast(ref uint64, t int64, v float64) error { return a.err } +func (a errAppender) Commit() error { return a.err } +func (a errAppender) Rollback() error { return a.err } + +func rangeForTimestamp(t int64, width int64) (maxt int64) { + return (t/width)*width + width +} + +type MultiWriter struct { + blocks map[index.Range]Writer + activeAppenders map[index.Range]storage.Appender + + logger log.Logger + dir string + // TODO(bwplotka): Allow more complex compaction levels. + sizeMillis int64 +} + +func NewMultiWriter(logger log.Logger, dir string, sizeMillis int64) *MultiWriter { + return &MultiWriter{ + logger: logger, + dir: dir, + sizeMillis: sizeMillis, + blocks: map[index.Range]Writer{}, + activeAppenders: map[index.Range]storage.Appender{}, + } +} + +// Appender is not thread-safe. Returned Appender is not thread-save as well. +// TODO(bwplotka): Consider making it thread safe. +func (w *MultiWriter) Appender() storage.Appender { return w } + +func (w *MultiWriter) getOrCreate(t int64) storage.Appender { + maxt := rangeForTimestamp(t, w.sizeMillis) + hash := index.Range{Start: maxt - w.sizeMillis, End: maxt} + if a, ok := w.activeAppenders[hash]; ok { + return a + } + + nw, err := NewTSDBWriter(w.logger, w.dir) + if err != nil { + return errAppender{err: errors.Wrap(err, "new tsdb writer")} + } + + w.blocks[hash] = nw + w.activeAppenders[hash] = nw.Appender() + return w.activeAppenders[hash] +} + +func (w *MultiWriter) Add(l labels.Labels, t int64, v float64) (uint64, error) { + return w.getOrCreate(t).Add(l, t, v) +} + +func (w *MultiWriter) AddFast(ref uint64, t int64, v float64) error { + return w.getOrCreate(t).AddFast(ref, t, v) +} + +func (w *MultiWriter) Commit() error { + var merr tsdb_errors.MultiError + for _, a := range w.activeAppenders { + merr.Add(a.Commit()) + } + return merr.Err() +} + +func (w *MultiWriter) Rollback() error { + var merr tsdb_errors.MultiError + for _, a := range w.activeAppenders { + merr.Add(a.Rollback()) + } + return merr.Err() +} + +func (w *MultiWriter) Flush() ([]ulid.ULID, error) { + ids := make([]ulid.ULID, 0, len(w.blocks)) + for _, b := range w.blocks { + id, err := b.Flush() + if err != nil { + return nil, err + } + ids = append(ids, id...) + } + return ids, nil +} + +func (w *MultiWriter) Close() error { + var merr tsdb_errors.MultiError + for _, b := range w.blocks { + merr.Add(b.Close()) + } + return merr.Err() +} diff --git a/tsdb/importer/blocks/writer.go b/tsdb/importer/blocks/writer.go new file mode 100644 index 000000000..3af869e06 --- /dev/null +++ b/tsdb/importer/blocks/writer.go @@ -0,0 +1,139 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blocks + +import ( + "context" + "io/ioutil" + "math" + "os" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +// Writer is interface to write time series into Prometheus blocks. +type Writer interface { + storage.Appendable + + // Flush writes current data to disk. + // The block or blocks will contain values accumulated by `Write`. + Flush() ([]ulid.ULID, error) + + // Close releases all resources. No append is allowed anymore to such writer. + Close() error +} + +var _ Writer = &TSDBWriter{} + +// Writer is a block writer that allows appending and flushing to disk. +type TSDBWriter struct { + logger log.Logger + dir string + + head *tsdb.Head + tmpDir string +} + +func durToMillis(t time.Duration) int64 { + return int64(t.Seconds() * 1000) +} + +// NewTSDBWriter create new block writer. +// +// The returned writer accumulates all series in memory until `Flush` is called. +// +// Note that the writer will not check if the target directory exists or +// contains anything at all. It is the caller's responsibility to +// ensure that the resulting blocks do not overlap etc. +// Writer ensures the block flush is atomic (via rename). +func NewTSDBWriter(logger log.Logger, dir string) (*TSDBWriter, error) { + res := &TSDBWriter{ + logger: logger, + dir: dir, + } + return res, res.initHead() +} + +// initHead creates and initialises new head. +func (w *TSDBWriter) initHead() error { + logger := w.logger + + // Keep Registerer and WAL nil as we don't use them. + // Put huge chunkRange; It has to be equal then expected block size. + // Since we don't have info about block size here, set it to large number. + + tmpDir, err := ioutil.TempDir(os.TempDir(), "head") + if err != nil { + return errors.Wrap(err, "create temp dir") + } + w.tmpDir = tmpDir + + h, err := tsdb.NewHead(nil, logger, nil, durToMillis(9999*time.Hour), w.tmpDir, nil, tsdb.DefaultStripeSize, nil) + if err != nil { + return errors.Wrap(err, "tsdb.NewHead") + } + + w.head = h + return w.head.Init(math.MinInt64) +} + +// Appender is not thread-safe. Returned Appender is thread-save however. +func (w *TSDBWriter) Appender() storage.Appender { + return w.head.Appender() +} + +// Flush implements Writer interface. This is where actual block writing +// happens. After flush completes, no write can be done. +func (w *TSDBWriter) Flush() ([]ulid.ULID, error) { + seriesCount := w.head.NumSeries() + if w.head.NumSeries() == 0 { + return nil, errors.New("no series appended; aborting.") + } + + mint := w.head.MinTime() + maxt := w.head.MaxTime() + 1 + level.Info(w.logger).Log("msg", "flushing", "series_count", seriesCount, "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt)) + + // Flush head to disk as a block. + compactor, err := tsdb.NewLeveledCompactor( + context.Background(), + nil, + w.logger, + []int64{durToMillis(2 * time.Hour)}, // Does not matter, used only for planning. + chunkenc.NewPool()) + if err != nil { + return nil, errors.Wrap(err, "create leveled compactor") + } + id, err := compactor.Write(w.dir, w.head, mint, maxt, nil) + if err != nil { + return nil, errors.Wrap(err, "compactor write") + } + // TODO(bwplotka): Potential truncate head, and allow writer reuse. Currently truncating fails with + // truncate chunks.HeadReadWriter: maxt of the files are not set. + return []ulid.ULID{id}, nil +} + +func (w *TSDBWriter) Close() error { + _ = os.RemoveAll(w.tmpDir) + return w.head.Close() +}