// Copyright 2017 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "flag" "fmt" "io" "io/ioutil" "net/http" _ "net/http/pprof" "os" "path/filepath" "runtime" "runtime/pprof" "sync" "time" "unsafe" "github.com/pkg/errors" promlabels "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/labels" "github.com/spf13/cobra" ) func main() { // Start HTTP server for pprof endpoint. go http.ListenAndServe(":9999", nil) root := &cobra.Command{ Use: "tsdb", Short: "CLI tool for tsdb", } root.AddCommand( NewBenchCommand(), ) flag.CommandLine.Set("log.level", "debug") root.Execute() } func NewBenchCommand() *cobra.Command { c := &cobra.Command{ Use: "bench", Short: "run benchmarks", } c.AddCommand(NewBenchWriteCommand()) return c } type writeBenchmark struct { outPath string cleanup bool numMetrics int storage *tsdb.DB cpuprof *os.File memprof *os.File blockprof *os.File mtxprof *os.File } func NewBenchWriteCommand() *cobra.Command { var wb writeBenchmark c := &cobra.Command{ Use: "write ", Short: "run a write performance benchmark", Run: wb.run, } c.PersistentFlags().StringVar(&wb.outPath, "out", "benchout/", "set the output path") c.PersistentFlags().IntVar(&wb.numMetrics, "metrics", 10000, "number of metrics to read") return c } func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { if len(args) != 1 { exitWithError(fmt.Errorf("missing file argument")) } if b.outPath == "" { dir, err := ioutil.TempDir("", "tsdb_bench") if err != nil { exitWithError(err) } b.outPath = dir b.cleanup = true } if err := os.RemoveAll(b.outPath); err != nil { exitWithError(err) } if err := os.MkdirAll(b.outPath, 0777); err != nil { exitWithError(err) } dir := filepath.Join(b.outPath, "storage") st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ WALFlushInterval: 200 * time.Millisecond, RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3), }) if err != nil { exitWithError(err) } b.storage = st var metrics []labels.Labels measureTime("readData", func() { f, err := os.Open(args[0]) if err != nil { exitWithError(err) } defer f.Close() metrics, err = readPrometheusLabels(f, b.numMetrics) if err != nil { exitWithError(err) } }) var total uint64 dur := measureTime("ingestScrapes", func() { b.startProfiling() total, err = b.ingestScrapes(metrics, 15000) if err != nil { exitWithError(err) } }) fmt.Println(" > total samples:", total) fmt.Println(" > samples/sec:", float64(total)/dur.Seconds()) select {} measureTime("stopStorage", func() { if err := b.storage.Close(); err != nil { exitWithError(err) } b.stopProfiling() }) } const timeDelta = 30000 func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) { var mu sync.Mutex var total uint64 for i := 0; i < scrapeCount; i += 100 { var wg sync.WaitGroup lbls := lbls for len(lbls) > 0 { l := 1000 if len(lbls) < 1000 { l = len(lbls) } batch := lbls[:l] lbls = lbls[l:] wg.Add(1) go func() { n, err := b.ingestScrapesShard(batch, 100, int64(timeDelta*i)) if err != nil { // exitWithError(err) fmt.Println(" err", err) } mu.Lock() total += n mu.Unlock() wg.Done() }() } wg.Wait() } fmt.Println("ingestion completed") return total, nil } func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) { ts := baset type sample struct { labels labels.Labels value int64 ref *uint64 } scrape := make([]*sample, 0, len(metrics)) for _, m := range metrics { scrape = append(scrape, &sample{ labels: m, value: 123456789, }) } total := uint64(0) for i := 0; i < scrapeCount; i++ { app := b.storage.Appender() ts += timeDelta for _, s := range scrape { s.value += 1000 if s.ref == nil { ref, err := app.Add(s.labels, ts, float64(s.value)) if err != nil { panic(err) } s.ref = &ref } else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil { if errors.Cause(err) != tsdb.ErrNotFound { panic(err) } ref, err := app.Add(s.labels, ts, float64(s.value)) if err != nil { panic(err) } s.ref = &ref } total++ } if err := app.Commit(); err != nil { return total, err } } return total, nil } func (b *writeBenchmark) startProfiling() { var err error // Start CPU profiling. b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof")) if err != nil { exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err)) } pprof.StartCPUProfile(b.cpuprof) // Start memory profiling. b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof")) if err != nil { exitWithError(fmt.Errorf("bench: could not create memory profile: %v", err)) } runtime.MemProfileRate = 64 * 1024 // Start fatal profiling. b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof")) if err != nil { exitWithError(fmt.Errorf("bench: could not create block profile: %v", err)) } runtime.SetBlockProfileRate(20) b.mtxprof, err = os.Create(filepath.Join(b.outPath, "mutex.prof")) if err != nil { exitWithError(fmt.Errorf("bench: could not create mutex profile: %v", err)) } runtime.SetMutexProfileFraction(20) } func (b *writeBenchmark) stopProfiling() { if b.cpuprof != nil { pprof.StopCPUProfile() b.cpuprof.Close() b.cpuprof = nil } if b.memprof != nil { pprof.Lookup("heap").WriteTo(b.memprof, 0) b.memprof.Close() b.memprof = nil } if b.blockprof != nil { pprof.Lookup("block").WriteTo(b.blockprof, 0) b.blockprof.Close() b.blockprof = nil runtime.SetBlockProfileRate(0) } if b.mtxprof != nil { pprof.Lookup("mutex").WriteTo(b.mtxprof, 0) b.mtxprof.Close() b.mtxprof = nil runtime.SetMutexProfileFraction(0) } } func measureTime(stage string, f func()) time.Duration { fmt.Printf(">> start stage=%s\n", stage) start := time.Now() f() fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start)) return time.Since(start) } func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { b, err := ioutil.ReadAll(r) if err != nil { return nil, err } p := textparse.New(b) i := 0 var mets []labels.Labels hashes := map[uint64]struct{}{} for p.Next() && i < n { m := make(labels.Labels, 0, 10) p.Metric((*promlabels.Labels)(unsafe.Pointer(&m))) h := m.Hash() if _, ok := hashes[h]; ok { continue } mets = append(mets, m) hashes[h] = struct{}{} i++ } return mets, p.Err() } func exitWithError(err error) { fmt.Fprintln(os.Stderr, err) os.Exit(1) }