diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index e389bf75b..74804fdf6 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -143,8 +143,8 @@ func main() { Default("-3h").String() backfillOutputDir := backfillRuleCmd.Flag("output dir", "The filepath on the local filesystem to write the output to. Output will be blocks containing the data of the backfilled recording rules.").Default("backfilldata/").String() backfillRuleURL := backfillRuleCmd.Flag("url", "Prometheus API url with the data where the rule will be backfilled from.").Default("localhost:9090").String() - backfillRuleEvalInterval := backfillRuleCmd.Flag("evaluation_interval", "How frequently to evaluate rules when backfilling."). - Default("15s").Duration() + backfillRuleEvalInterval := backfillRuleCmd.Flag("evaluation_interval default", "How frequently to evaluate rules when backfilling. evaluation interval in the rules file will take precedence."). + Default("60s").Duration() backfillRuleFiles := backfillRuleCmd.Arg( "rule-files", "A list of one or more files containing recording rules to be backfilled. All recording rules listed in the files will be backfilled. Alerting rules are not evaluated.", @@ -207,7 +207,7 @@ func main() { os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime))) case backfillRuleCmd.FullCommand(): - os.Exit(BackfillRule(*backfillRuleURL, *backfillRuleStart, *backfillRuleEnd, *backfillOutputDir, *backfillRuleEvalInterval, *backfillRuleFiles...)) + os.Exit(checkErr(BackfillRule(*backfillRuleURL, *backfillRuleStart, *backfillRuleEnd, *backfillOutputDir, *backfillRuleEvalInterval, *backfillRuleFiles...))) } } @@ -785,12 +785,12 @@ func (j *jsonPrinter) printLabelValues(v model.LabelValues) { // BackfillRule backfills recording rules from the files provided. The output are blocks of data // at the outputDir location. -func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration, files ...string) int { +func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration, files ...string) error { ctx := context.Background() stime, etime, err := parseStartTimeAndEndTime(start, end) if err != nil { fmt.Fprintln(os.Stderr, err) - return 1 + return err } cfg := RuleImporterConfig{ Start: stime, @@ -801,17 +801,16 @@ func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration, } logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) ruleImporter := NewRuleImporter(logger, cfg) - err = ruleImporter.Init() - if err != nil { + if err = ruleImporter.Init(); err != nil { fmt.Fprintln(os.Stderr, "rule importer init error", err) - return 1 + return err } errs := ruleImporter.LoadGroups(ctx, files) for _, err := range errs { if err != nil { fmt.Fprintln(os.Stderr, "rule importer parse error", err) - return 1 + return err } } @@ -822,5 +821,5 @@ func BackfillRule(url, start, end, outputDir string, evalInterval time.Duration, } } - return 0 + return nil } diff --git a/cmd/promtool/rules.go b/cmd/promtool/rules.go index 5fa100e49..cb0586804 100644 --- a/cmd/promtool/rules.go +++ b/cmd/promtool/rules.go @@ -26,7 +26,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/rules" - "github.com/prometheus/prometheus/tsdb/importer/blocks" + "github.com/prometheus/prometheus/tsdb" ) const blockSize = 2 // in hours @@ -64,9 +64,9 @@ func NewRuleImporter(logger log.Logger, config RuleImporterConfig) *RuleImporter // Init initializes the rule importer which creates a new block writer // and creates an Prometheus API client. func (importer *RuleImporter) Init() error { - importer.writer = blocks.NewMultiWriter(importer.logger, + importer.writer = tsdb.NewBlockWriter(importer.logger, importer.config.OutputDir, - importer.config.EvalInterval.Nanoseconds(), + (blockSize * time.Hour).Milliseconds() ) config := api.Config{ diff --git a/tsdb/importer/blocks/multi.go b/tsdb/importer/blocks/multi.go deleted file mode 100644 index 4007860d8..000000000 --- a/tsdb/importer/blocks/multi.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package blocks - -import ( - "github.com/go-kit/kit/log" - "github.com/oklog/ulid" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/storage" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" - "github.com/prometheus/prometheus/tsdb/index" -) - -type errAppender struct{ err error } - -func (a errAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { return 0, a.err } -func (a errAppender) AddFast(ref uint64, t int64, v float64) error { return a.err } -func (a errAppender) Commit() error { return a.err } -func (a errAppender) Rollback() error { return a.err } - -func rangeForTimestamp(t int64, width int64) (maxt int64) { - return (t/width)*width + width -} - -type MultiWriter struct { - blocks map[index.Range]Writer - activeAppenders map[index.Range]storage.Appender - - logger log.Logger - dir string - // TODO(bwplotka): Allow more complex compaction levels. - sizeMillis int64 -} - -func NewMultiWriter(logger log.Logger, dir string, sizeMillis int64) *MultiWriter { - return &MultiWriter{ - logger: logger, - dir: dir, - sizeMillis: sizeMillis, - blocks: map[index.Range]Writer{}, - activeAppenders: map[index.Range]storage.Appender{}, - } -} - -// Appender is not thread-safe. Returned Appender is not thread-save as well. -// TODO(bwplotka): Consider making it thread safe. -func (w *MultiWriter) Appender() storage.Appender { return w } - -func (w *MultiWriter) getOrCreate(t int64) storage.Appender { - maxt := rangeForTimestamp(t, w.sizeMillis) - hash := index.Range{Start: maxt - w.sizeMillis, End: maxt} - if a, ok := w.activeAppenders[hash]; ok { - return a - } - - nw, err := NewTSDBWriter(w.logger, w.dir) - if err != nil { - return errAppender{err: errors.Wrap(err, "new tsdb writer")} - } - - w.blocks[hash] = nw - w.activeAppenders[hash] = nw.Appender() - return w.activeAppenders[hash] -} - -func (w *MultiWriter) Add(l labels.Labels, t int64, v float64) (uint64, error) { - return w.getOrCreate(t).Add(l, t, v) -} - -func (w *MultiWriter) AddFast(ref uint64, t int64, v float64) error { - return w.getOrCreate(t).AddFast(ref, t, v) -} - -func (w *MultiWriter) Commit() error { - var merr tsdb_errors.MultiError - for _, a := range w.activeAppenders { - merr.Add(a.Commit()) - } - return merr.Err() -} - -func (w *MultiWriter) Rollback() error { - var merr tsdb_errors.MultiError - for _, a := range w.activeAppenders { - merr.Add(a.Rollback()) - } - return merr.Err() -} - -func (w *MultiWriter) Flush() ([]ulid.ULID, error) { - ids := make([]ulid.ULID, 0, len(w.blocks)) - for _, b := range w.blocks { - id, err := b.Flush() - if err != nil { - return nil, err - } - ids = append(ids, id...) - } - return ids, nil -} - -func (w *MultiWriter) Close() error { - var merr tsdb_errors.MultiError - for _, b := range w.blocks { - merr.Add(b.Close()) - } - return merr.Err() -} diff --git a/tsdb/importer/blocks/writer.go b/tsdb/importer/blocks/writer.go deleted file mode 100644 index 3af869e06..000000000 --- a/tsdb/importer/blocks/writer.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package blocks - -import ( - "context" - "io/ioutil" - "math" - "os" - "time" - - "github.com/oklog/ulid" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/timestamp" - "github.com/prometheus/prometheus/tsdb/chunkenc" -) - -// Writer is interface to write time series into Prometheus blocks. -type Writer interface { - storage.Appendable - - // Flush writes current data to disk. - // The block or blocks will contain values accumulated by `Write`. - Flush() ([]ulid.ULID, error) - - // Close releases all resources. No append is allowed anymore to such writer. - Close() error -} - -var _ Writer = &TSDBWriter{} - -// Writer is a block writer that allows appending and flushing to disk. -type TSDBWriter struct { - logger log.Logger - dir string - - head *tsdb.Head - tmpDir string -} - -func durToMillis(t time.Duration) int64 { - return int64(t.Seconds() * 1000) -} - -// NewTSDBWriter create new block writer. -// -// The returned writer accumulates all series in memory until `Flush` is called. -// -// Note that the writer will not check if the target directory exists or -// contains anything at all. It is the caller's responsibility to -// ensure that the resulting blocks do not overlap etc. -// Writer ensures the block flush is atomic (via rename). -func NewTSDBWriter(logger log.Logger, dir string) (*TSDBWriter, error) { - res := &TSDBWriter{ - logger: logger, - dir: dir, - } - return res, res.initHead() -} - -// initHead creates and initialises new head. -func (w *TSDBWriter) initHead() error { - logger := w.logger - - // Keep Registerer and WAL nil as we don't use them. - // Put huge chunkRange; It has to be equal then expected block size. - // Since we don't have info about block size here, set it to large number. - - tmpDir, err := ioutil.TempDir(os.TempDir(), "head") - if err != nil { - return errors.Wrap(err, "create temp dir") - } - w.tmpDir = tmpDir - - h, err := tsdb.NewHead(nil, logger, nil, durToMillis(9999*time.Hour), w.tmpDir, nil, tsdb.DefaultStripeSize, nil) - if err != nil { - return errors.Wrap(err, "tsdb.NewHead") - } - - w.head = h - return w.head.Init(math.MinInt64) -} - -// Appender is not thread-safe. Returned Appender is thread-save however. -func (w *TSDBWriter) Appender() storage.Appender { - return w.head.Appender() -} - -// Flush implements Writer interface. This is where actual block writing -// happens. After flush completes, no write can be done. -func (w *TSDBWriter) Flush() ([]ulid.ULID, error) { - seriesCount := w.head.NumSeries() - if w.head.NumSeries() == 0 { - return nil, errors.New("no series appended; aborting.") - } - - mint := w.head.MinTime() - maxt := w.head.MaxTime() + 1 - level.Info(w.logger).Log("msg", "flushing", "series_count", seriesCount, "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt)) - - // Flush head to disk as a block. - compactor, err := tsdb.NewLeveledCompactor( - context.Background(), - nil, - w.logger, - []int64{durToMillis(2 * time.Hour)}, // Does not matter, used only for planning. - chunkenc.NewPool()) - if err != nil { - return nil, errors.Wrap(err, "create leveled compactor") - } - id, err := compactor.Write(w.dir, w.head, mint, maxt, nil) - if err != nil { - return nil, errors.Wrap(err, "compactor write") - } - // TODO(bwplotka): Potential truncate head, and allow writer reuse. Currently truncating fails with - // truncate chunks.HeadReadWriter: maxt of the files are not set. - return []ulid.ULID{id}, nil -} - -func (w *TSDBWriter) Close() error { - _ = os.RemoveAll(w.tmpDir) - return w.head.Close() -}