prometheus/cmd/tsdb/main.go

386 lines
9.5 KiB
Go
Raw Normal View History

2017-04-10 18:59:45 +00:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-12-07 16:30:10 +00:00
package main
import (
"bufio"
2016-12-09 12:41:38 +00:00
"flag"
2016-12-07 16:30:10 +00:00
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"sort"
"strconv"
"strings"
2016-12-07 16:30:10 +00:00
"sync"
2017-10-19 16:14:37 +00:00
"text/tabwriter"
2016-12-07 16:30:10 +00:00
"time"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
"gopkg.in/alecthomas/kingpin.v2"
2016-12-07 16:30:10 +00:00
)
func main() {
var (
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
benchCmd = cli.Command("bench", "run benchmarks")
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "testdata", "20kseries.json")).String()
2017-10-01 20:18:50 +00:00
listCmd = cli.Command("ls", "list db blocks")
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
listPath = listCmd.Arg("db path", "database path (default is "+filepath.Join("benchout", "storage")+")").Default(filepath.Join("benchout", "storage")).String()
2016-12-07 16:30:10 +00:00
)
switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
case benchWriteCmd.FullCommand():
wb := &writeBenchmark{
outPath: *benchWriteOutPath,
numMetrics: *benchWriteNumMetrics,
samplesFile: *benchSamplesFile,
}
wb.run()
2017-10-01 20:18:50 +00:00
case listCmd.FullCommand():
db, err := tsdb.Open(*listPath, nil, nil, nil)
if err != nil {
exitWithError(err)
}
printBlocks(db.Blocks(), listCmdHumanReadable)
2016-12-07 16:30:10 +00:00
}
flag.CommandLine.Set("log.level", "debug")
2016-12-07 16:30:10 +00:00
}
type writeBenchmark struct {
outPath string
samplesFile string
cleanup bool
numMetrics int
2016-12-07 16:30:10 +00:00
2017-02-19 15:04:37 +00:00
storage *tsdb.DB
2016-12-07 16:30:10 +00:00
cpuprof *os.File
memprof *os.File
blockprof *os.File
2017-05-14 09:51:56 +00:00
mtxprof *os.File
2016-12-07 16:30:10 +00:00
}
func (b *writeBenchmark) run() {
2016-12-07 16:30:10 +00:00
if b.outPath == "" {
dir, err := ioutil.TempDir("", "tsdb_bench")
if err != nil {
exitWithError(err)
}
b.outPath = dir
b.cleanup = true
}
if err := os.RemoveAll(b.outPath); err != nil {
exitWithError(err)
}
if err := os.MkdirAll(b.outPath, 0777); err != nil {
exitWithError(err)
}
dir := filepath.Join(b.outPath, "storage")
l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
st, err := tsdb.Open(dir, l, nil, &tsdb.Options{
2017-02-14 07:53:19 +00:00
WALFlushInterval: 200 * time.Millisecond,
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
2017-08-03 16:33:13 +00:00
BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3),
2017-02-10 01:54:26 +00:00
})
if err != nil {
exitWithError(err)
2016-12-07 16:30:10 +00:00
}
b.storage = st
2017-01-16 20:29:53 +00:00
var metrics []labels.Labels
2016-12-07 16:30:10 +00:00
measureTime("readData", func() {
f, err := os.Open(b.samplesFile)
2016-12-07 16:30:10 +00:00
if err != nil {
exitWithError(err)
}
defer f.Close()
metrics, err = readPrometheusLabels(f, b.numMetrics)
if err != nil {
exitWithError(err)
}
})
var total uint64
dur := measureTime("ingestScrapes", func() {
2016-12-07 16:30:10 +00:00
b.startProfiling()
total, err = b.ingestScrapes(metrics, 3000)
if err != nil {
2016-12-07 16:30:10 +00:00
exitWithError(err)
}
})
fmt.Println(" > total samples:", total)
fmt.Println(" > samples/sec:", float64(total)/dur.Seconds())
2016-12-07 16:30:10 +00:00
measureTime("stopStorage", func() {
if err := b.storage.Close(); err != nil {
2016-12-07 16:30:10 +00:00
exitWithError(err)
}
if err := b.stopProfiling(); err != nil {
exitWithError(err)
}
2016-12-07 16:30:10 +00:00
})
}
const timeDelta = 30000
func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) {
var mu sync.Mutex
var total uint64
2016-12-07 16:30:10 +00:00
for i := 0; i < scrapeCount; i += 100 {
var wg sync.WaitGroup
lbls := lbls
for len(lbls) > 0 {
l := 1000
if len(lbls) < 1000 {
l = len(lbls)
2016-12-07 16:30:10 +00:00
}
batch := lbls[:l]
lbls = lbls[l:]
wg.Add(1)
go func() {
n, err := b.ingestScrapesShard(batch, 100, int64(timeDelta*i))
if err != nil {
// exitWithError(err)
fmt.Println(" err", err)
}
mu.Lock()
total += n
mu.Unlock()
wg.Done()
}()
}
wg.Wait()
2016-12-07 16:30:10 +00:00
}
fmt.Println("ingestion completed")
return total, nil
2016-12-07 16:30:10 +00:00
}
func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) {
ts := baset
2016-12-07 16:30:10 +00:00
2016-12-08 09:04:24 +00:00
type sample struct {
2016-12-21 08:39:01 +00:00
labels labels.Labels
2016-12-08 09:04:24 +00:00
value int64
ref *uint64
2016-12-08 09:04:24 +00:00
}
scrape := make([]*sample, 0, len(metrics))
2016-12-08 09:04:24 +00:00
for _, m := range metrics {
scrape = append(scrape, &sample{
labels: m,
2016-12-08 09:04:24 +00:00
value: 123456789,
})
2016-12-08 09:04:24 +00:00
}
total := uint64(0)
2016-12-08 09:04:24 +00:00
2016-12-07 16:30:10 +00:00
for i := 0; i < scrapeCount; i++ {
app := b.storage.Appender()
ts += timeDelta
2016-12-07 16:30:10 +00:00
2016-12-08 09:04:24 +00:00
for _, s := range scrape {
2016-12-09 09:00:14 +00:00
s.value += 1000
if s.ref == nil {
ref, err := app.Add(s.labels, ts, float64(s.value))
if err != nil {
panic(err)
}
s.ref = &ref
} else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil {
if errors.Cause(err) != tsdb.ErrNotFound {
panic(err)
}
ref, err := app.Add(s.labels, ts, float64(s.value))
if err != nil {
panic(err)
}
s.ref = &ref
}
total++
2016-12-07 16:30:10 +00:00
}
if err := app.Commit(); err != nil {
return total, err
2016-12-07 16:30:10 +00:00
}
}
return total, nil
2016-12-07 16:30:10 +00:00
}
func (b *writeBenchmark) startProfiling() {
var err error
// Start CPU profiling.
b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof"))
if err != nil {
2017-03-19 16:05:01 +00:00
exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err))
2016-12-07 16:30:10 +00:00
}
if err := pprof.StartCPUProfile(b.cpuprof); err != nil {
exitWithError(fmt.Errorf("bench: could not start CPU profile: %v", err))
}
2016-12-07 16:30:10 +00:00
// Start memory profiling.
b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof"))
if err != nil {
2017-03-19 16:05:01 +00:00
exitWithError(fmt.Errorf("bench: could not create memory profile: %v", err))
2016-12-07 16:30:10 +00:00
}
2017-05-14 09:51:56 +00:00
runtime.MemProfileRate = 64 * 1024
2016-12-07 16:30:10 +00:00
// Start fatal profiling.
b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof"))
if err != nil {
2017-03-19 16:05:01 +00:00
exitWithError(fmt.Errorf("bench: could not create block profile: %v", err))
2016-12-07 16:30:10 +00:00
}
2017-05-14 09:51:56 +00:00
runtime.SetBlockProfileRate(20)
b.mtxprof, err = os.Create(filepath.Join(b.outPath, "mutex.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create mutex profile: %v", err))
}
runtime.SetMutexProfileFraction(20)
2016-12-07 16:30:10 +00:00
}
func (b *writeBenchmark) stopProfiling() error {
2016-12-07 16:30:10 +00:00
if b.cpuprof != nil {
pprof.StopCPUProfile()
b.cpuprof.Close()
b.cpuprof = nil
}
if b.memprof != nil {
if err := pprof.Lookup("heap").WriteTo(b.memprof, 0); err != nil {
return fmt.Errorf("error writing mem profile: %v", err)
}
2016-12-07 16:30:10 +00:00
b.memprof.Close()
b.memprof = nil
}
if b.blockprof != nil {
if err := pprof.Lookup("block").WriteTo(b.blockprof, 0); err != nil {
return fmt.Errorf("error writing block profile: %v", err)
}
2016-12-07 16:30:10 +00:00
b.blockprof.Close()
b.blockprof = nil
runtime.SetBlockProfileRate(0)
}
2017-05-14 09:51:56 +00:00
if b.mtxprof != nil {
if err := pprof.Lookup("mutex").WriteTo(b.mtxprof, 0); err != nil {
return fmt.Errorf("error writing mutex profile: %v", err)
}
2017-05-14 09:51:56 +00:00
b.mtxprof.Close()
b.mtxprof = nil
runtime.SetMutexProfileFraction(0)
}
return nil
2016-12-07 16:30:10 +00:00
}
func measureTime(stage string, f func()) time.Duration {
2016-12-07 16:30:10 +00:00
fmt.Printf(">> start stage=%s\n", stage)
start := time.Now()
f()
fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start))
return time.Since(start)
2016-12-07 16:30:10 +00:00
}
func mapToLabels(m map[string]interface{}, l *labels.Labels) {
for k, v := range m {
*l = append(*l, labels.Label{Name: k, Value: v.(string)})
2017-01-16 20:29:53 +00:00
}
}
func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
scanner := bufio.NewScanner(r)
2016-12-07 16:30:10 +00:00
2017-01-16 20:29:53 +00:00
var mets []labels.Labels
hashes := map[uint64]struct{}{}
i := 0
2016-12-07 16:30:10 +00:00
for scanner.Scan() && i < n {
2017-01-16 20:29:53 +00:00
m := make(labels.Labels, 0, 10)
r := strings.NewReplacer("\"", "", "{", "", "}", "")
s := r.Replace(scanner.Text())
labelChunks := strings.Split(s, ",")
for _, labelChunk := range labelChunks {
split := strings.Split(labelChunk, ":")
m = append(m, labels.Label{Name: split[0], Value: split[1]})
}
// Order of the k/v labels matters, don't assume we'll always receive them already sorted.
sort.Sort(m)
2017-01-16 20:29:53 +00:00
h := m.Hash()
if _, ok := hashes[h]; ok {
continue
2016-12-07 16:30:10 +00:00
}
2017-01-16 20:29:53 +00:00
mets = append(mets, m)
hashes[h] = struct{}{}
i++
2016-12-07 16:30:10 +00:00
}
return mets, nil
2016-12-07 16:30:10 +00:00
}
func exitWithError(err error) {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
2017-10-02 20:48:47 +00:00
func printBlocks(blocks []*tsdb.Block, humanReadable *bool) {
2017-10-19 16:14:37 +00:00
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
2017-10-02 20:48:47 +00:00
defer tw.Flush()
fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES")
for _, b := range blocks {
2017-10-11 09:02:57 +00:00
meta := b.Meta()
2017-10-02 20:48:47 +00:00
fmt.Fprintf(tw,
"%v\t%v\t%v\t%v\t%v\t%v\n",
2017-10-11 09:02:57 +00:00
meta.ULID,
getFormatedTime(meta.MinTime, humanReadable),
getFormatedTime(meta.MaxTime, humanReadable),
2017-10-11 09:02:57 +00:00
meta.Stats.NumSamples,
meta.Stats.NumChunks,
meta.Stats.NumSeries,
2017-10-02 20:48:47 +00:00
)
}
}
func getFormatedTime(timestamp int64, humanReadable *bool) string {
if *humanReadable {
return time.Unix(timestamp/1000, 0).String()
}
return strconv.FormatInt(timestamp, 10)
}