Add "dump" command to tsdb tool to dump all samples (#532)

* Add "dump" command to tsdb tool to dump all samples

Signed-off-by: Julius Volz <julius.volz@gmail.com>
This commit is contained in:
Julius Volz 2019-02-25 14:51:33 +01:00 committed by Krasi Georgiev
parent 10d395259b
commit 752e022aba
1 changed files with 49 additions and 4 deletions

View File

@ -19,6 +19,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"runtime"
@ -40,6 +41,8 @@ import (
func main() {
var (
defaultDBPath = filepath.Join("benchout", "storage")
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
benchCmd = cli.Command("bench", "run benchmarks")
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
@ -48,13 +51,20 @@ func main() {
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "testdata", "20kseries.json")).String()
listCmd = cli.Command("ls", "list db blocks")
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
listPath = listCmd.Arg("db path", "database path (default is "+filepath.Join("benchout", "storage")+")").Default(filepath.Join("benchout", "storage")).String()
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+filepath.Join("benchout", "storage")+")").Default(filepath.Join("benchout", "storage")).String()
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
)
safeDBOptions := *tsdb.DefaultOptions
safeDBOptions.RetentionDuration = 0
switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
case benchWriteCmd.FullCommand():
wb := &writeBenchmark{
@ -64,13 +74,13 @@ func main() {
}
wb.run()
case listCmd.FullCommand():
db, err := tsdb.Open(*listPath, nil, nil, nil)
db, err := tsdb.Open(*listPath, nil, nil, &safeDBOptions)
if err != nil {
exitWithError(err)
}
printBlocks(db.Blocks(), listCmdHumanReadable)
case analyzeCmd.FullCommand():
db, err := tsdb.Open(*analyzePath, nil, nil, nil)
db, err := tsdb.Open(*analyzePath, nil, nil, &safeDBOptions)
if err != nil {
exitWithError(err)
}
@ -90,6 +100,12 @@ func main() {
exitWithError(fmt.Errorf("block not found"))
}
analyzeBlock(block, *analyzeLimit)
case dumpCmd.FullCommand():
db, err := tsdb.Open(*dumpPath, nil, nil, &safeDBOptions)
if err != nil {
exitWithError(err)
}
dumpSamples(db, *dumpMinTime, *dumpMaxTime)
}
flag.CommandLine.Set("log.level", "debug")
}
@ -531,3 +547,32 @@ func analyzeBlock(b *tsdb.Block, limit int) {
fmt.Printf("\nHighest cardinality metric names:\n")
printInfo(postingInfos)
}
func dumpSamples(db *tsdb.DB, mint, maxt int64) {
q, err := db.Querier(mint, maxt)
if err != nil {
exitWithError(err)
}
ss, err := q.Select(labels.NewMustRegexpMatcher("", ".*"))
if err != nil {
exitWithError(err)
}
for ss.Next() {
series := ss.At()
labels := series.Labels()
it := series.Iterator()
for it.Next() {
ts, val := it.At()
fmt.Printf("%s %g %d\n", labels, val, ts)
}
if it.Err() != nil {
exitWithError(ss.Err())
}
}
if ss.Err() != nil {
exitWithError(ss.Err())
}
}