Redfine append interface, remove old Prometheus storage from bench

This commit is contained in:
Fabian Reinartz 2016-12-21 00:02:37 +01:00
parent cddc29fa17
commit ee217adc7e
3 changed files with 68 additions and 191 deletions

View File

@ -1,16 +1,9 @@
build:
@go build .
bench_default: build
bench: build
@echo ">> running benchmark"
@./tsdb bench write --out=benchout/default --engine=default --metrics=$(NUM_METRICS) testdata.1m
@go tool pprof -svg ./tsdb benchout/default/cpu.prof > benchout/default/cpuprof.svg
@go tool pprof -svg ./tsdb benchout/default/mem.prof > benchout/default/memprof.svg
@go tool pprof -svg ./tsdb benchout/default/block.prof > benchout/default/blockprof.svg
bench_tsdb: build
@echo ">> running benchmark"
@./tsdb bench write --out=benchout/tsdb --engine=tsdb --metrics=$(NUM_METRICS) testdata.1m
@go tool pprof -svg ./tsdb benchout/tsdb/cpu.prof > benchout/tsdb/cpuprof.svg
@go tool pprof -svg ./tsdb benchout/tsdb/mem.prof > benchout/tsdb/memprof.svg
@go tool pprof -svg ./tsdb benchout/tsdb/block.prof > benchout/tsdb/blockprof.svg
@./tsdb bench write --out=benchout/ --metrics=$(NUM_METRICS) testdata.100k
@go tool pprof -svg ./tsdb benchout/cpu.prof > benchout/cpuprof.svg
@go tool pprof -svg ./tsdb benchout/mem.prof > benchout/memprof.svg
@go tool pprof -svg ./tsdb benchout/block.prof > benchout/blockprof.svg

View File

@ -17,7 +17,6 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local"
"github.com/spf13/cobra"
)
@ -49,10 +48,9 @@ func NewBenchCommand() *cobra.Command {
type writeBenchmark struct {
outPath string
cleanup bool
engine string
numMetrics int
storage benchmarkStorage
storage *tsdb.DB
cpuprof *os.File
memprof *os.File
@ -66,7 +64,6 @@ func NewBenchWriteCommand() *cobra.Command {
Short: "run a write performance benchmark",
Run: wb.run,
}
c.PersistentFlags().StringVar(&wb.engine, "engine", "tsdb", "the storage engine to use")
c.PersistentFlags().StringVar(&wb.outPath, "out", "benchout/", "set the output path")
c.PersistentFlags().IntVar(&wb.numMetrics, "metrics", 10000, "number of metrics to read")
return c
@ -93,22 +90,12 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
dir := filepath.Join(b.outPath, "storage")
switch b.engine {
case "tsdb":
st, err := newTSDBStorage(dir)
st, err := tsdb.Open(dir, nil, nil)
if err != nil {
exitWithError(err)
}
b.storage = st
case "default":
st, err := newDefaultStorage(dir)
if err != nil {
exitWithError(err)
}
b.storage = st
default:
exitWithError(fmt.Errorf("unknown storage engine %q", b.engine))
}
var metrics []model.Metric
measureTime("readData", func() {
@ -138,7 +125,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
}
})
measureTime("stopStorage", func() {
if err := b.storage.stop(); err != nil {
if err := b.storage.Close(); err != nil {
exitWithError(err)
}
b.stopProfiling()
@ -171,7 +158,7 @@ func (b *writeBenchmark) ingestScrapes(metrics []model.Metric, scrapeCount int)
}
func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount int) error {
var sc tsdb.Vector
app := b.storage.Appender()
ts := int64(model.Now())
type sample struct {
@ -196,82 +183,18 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount
for i := 0; i < scrapeCount; i++ {
ts += int64(30000)
sc.Reset()
for _, s := range scrape {
s.value += 1000
sc.Add(s.labels, float64(s.value))
app.Add(s.labels, ts, float64(s.value))
}
if err := b.storage.ingestScrape(ts, &sc); err != nil {
if err := app.Commit(); err != nil {
return err
}
}
return nil
}
type benchmarkStorage interface {
ingestScrape(int64, *tsdb.Vector) error
stop() error
}
type tsdbStorage struct {
c *tsdb.DB
}
func (c *tsdbStorage) stop() error {
return c.c.Close()
}
func (c *tsdbStorage) ingestScrape(ts int64, s *tsdb.Vector) error {
return c.c.AppendVector(ts, s)
}
func newTSDBStorage(path string) (*tsdbStorage, error) {
c, err := tsdb.Open(path, nil, nil)
if err != nil {
return nil, err
}
return &tsdbStorage{
c: c,
}, nil
}
type defaultStorage struct {
s local.Storage
}
func (s *defaultStorage) ingestScrape(ts int64, scrape *tsdb.Vector) error {
for _, samples := range scrape.Buckets {
for _, smpl := range samples {
met := make(model.Metric, len(smpl.Labels))
for _, l := range smpl.Labels {
met[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
if err := s.s.Append(&model.Sample{
Metric: met,
Timestamp: model.Time(ts),
Value: model.SampleValue(smpl.Value),
}); err != nil {
return err
}
}
}
return nil
}
func (s *defaultStorage) stop() error {
return s.s.Stop()
}
func newDefaultStorage(path string) (*defaultStorage, error) {
s := local.NewMemorySeriesStorage(&local.MemorySeriesStorageOptions{
PersistenceStoragePath: path,
SyncStrategy: local.Adaptive,
CheckpointInterval: 5 * time.Minute,
})
return &defaultStorage{s: s}, s.Start()
}
func (b *writeBenchmark) startProfiling() {
var err error

145
db.go
View File

@ -42,7 +42,7 @@ type DB struct {
// TODO(fabxc): make configurable
const (
shardShift = 0
shardShift = 3
numShards = 1 << shardShift
maxChunkSize = 1024
)
@ -103,99 +103,70 @@ func (db *DB) Close() error {
return g.Wait()
}
// Appender adds a batch of samples.
// Appender allows committing batches of samples to a database.
// The data held by the appender is reset after Commit returns.
type Appender interface {
// Add adds a sample pair to the appended batch.
Add(l Labels, t int64, v float64)
// AddSeries registers a new known series label set with the appender
// and returns a reference number used to add samples to it over the
// life time of the Appender.
// AddSeries(Labels) uint64
// Commit submits the collected samples.
// Add adds a sample pair for the referenced series.
Add(lset Labels, t int64, v float64)
// Commit submits the collected samples and purges the batch.
Commit() error
}
// Vector is a set of LabelSet associated with one value each.
// Label sets and values must have equal length.
type Vector struct {
Buckets map[uint16][]Sample
reused int
}
type Sample struct {
Hash uint64
Labels Labels
Value float64
}
// Reset the vector but keep resources allocated.
func (v *Vector) Reset() {
// Do a full reset every n-th reusage to avoid memory leaks.
if v.Buckets == nil || v.reused > 100 {
v.Buckets = make(map[uint16][]Sample, 0)
return
// Appender returns a new appender against the database.
func (db *DB) Appender() Appender {
return &bucketAppender{
db: db,
buckets: make([][]hashedSample, numShards),
}
for x, bkt := range v.Buckets {
v.Buckets[x] = bkt[:0]
}
v.reused++
}
// Add a sample to the vector.
func (v *Vector) Add(lset Labels, val float64) {
type bucketAppender struct {
db *DB
buckets [][]hashedSample
}
func (ba *bucketAppender) Add(lset Labels, t int64, v float64) {
h := lset.Hash()
s := uint16(h >> (64 - shardShift))
s := h >> (64 - shardShift)
v.Buckets[s] = append(v.Buckets[s], Sample{
Hash: h,
Labels: lset,
Value: val,
ba.buckets[s] = append(ba.buckets[s], hashedSample{
hash: h,
labels: lset,
t: t,
v: v,
})
}
// func (db *DB) Appender() Appender {
// return &bucketAppender{
// samples: make([]Sample, 1024),
// }
// }
// type bucketAppender struct {
// db *DB
// // buckets []Sam
// }
// func (a *bucketAppender) Add(l Labels, t int64, v float64) {
// }
// func (a *bucketAppender) Commit() error {
// // f
// }
// AppendVector adds values for a list of label sets for the given timestamp
// in milliseconds.
func (db *DB) AppendVector(ts int64, v *Vector) error {
// Sequentially add samples to shards.
for s, bkt := range v.Buckets {
shard := db.shards[s]
if err := shard.appendBatch(ts, bkt); err != nil {
// TODO(fabxc): handle gracefully and collect multi-error.
return err
func (ba *bucketAppender) reset() {
for i := range ba.buckets {
ba.buckets[i] = ba.buckets[i][:0]
}
}
return nil
}
func (db *DB) AppendSingle(lset Labels, ts int64, v float64) error {
sort.Sort(lset)
h := lset.Hash()
s := uint16(h >> (64 - shardShift))
func (ba *bucketAppender) Commit() error {
defer ba.reset()
return db.shards[s].appendBatch(ts, []Sample{
{
Hash: h,
Labels: lset,
Value: v,
},
})
var merr MultiError
// Spill buckets into shards.
for s, b := range ba.buckets {
merr.Add(ba.db.shards[s].appendBatch(b))
}
return merr.Err()
}
type hashedSample struct {
hash uint64
labels Labels
t int64
v float64
}
const sep = '\xff'
@ -253,22 +224,14 @@ func (s *Shard) Close() error {
return e.Err()
}
func (s *Shard) appendBatch(ts int64, samples []Sample) error {
// TODO(fabxc): make configurable.
const persistenceTimeThreshold = 1000 * 60 * 60 // 1 hour if timestamp in ms
func (s *Shard) appendBatch(samples []hashedSample) error {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, smpl := range samples {
if err := s.head.append(smpl.Hash, smpl.Labels, ts, smpl.Value); err != nil {
// TODO(fabxc): handle gracefully and collect multi-error.
return err
}
}
var merr MultiError
if ts > s.head.stats.MaxTime {
s.head.stats.MaxTime = ts
for _, sm := range samples {
merr.Add(s.head.append(sm.hash, sm.labels, sm.t, sm.v))
}
// TODO(fabxc): randomize over time
@ -284,7 +247,7 @@ func (s *Shard) appendBatch(ts int64, samples []Sample) error {
}
}
return nil
return merr.Err()
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
@ -320,8 +283,6 @@ func (s *Shard) blocksForInterval(mint, maxt int64) []block {
bs = append(bs, s.head)
}
fmt.Println("blocks for interval", bs)
return bs
}