diff --git a/cmd/tsdb/Makefile b/cmd/tsdb/Makefile index 221ca893c..91c11e39e 100644 --- a/cmd/tsdb/Makefile +++ b/cmd/tsdb/Makefile @@ -1,16 +1,9 @@ build: @go build . -bench_default: build +bench: build @echo ">> running benchmark" - @./tsdb bench write --out=benchout/default --engine=default --metrics=$(NUM_METRICS) testdata.1m - @go tool pprof -svg ./tsdb benchout/default/cpu.prof > benchout/default/cpuprof.svg - @go tool pprof -svg ./tsdb benchout/default/mem.prof > benchout/default/memprof.svg - @go tool pprof -svg ./tsdb benchout/default/block.prof > benchout/default/blockprof.svg - -bench_tsdb: build - @echo ">> running benchmark" - @./tsdb bench write --out=benchout/tsdb --engine=tsdb --metrics=$(NUM_METRICS) testdata.1m - @go tool pprof -svg ./tsdb benchout/tsdb/cpu.prof > benchout/tsdb/cpuprof.svg - @go tool pprof -svg ./tsdb benchout/tsdb/mem.prof > benchout/tsdb/memprof.svg - @go tool pprof -svg ./tsdb benchout/tsdb/block.prof > benchout/tsdb/blockprof.svg + @./tsdb bench write --out=benchout/ --metrics=$(NUM_METRICS) testdata.100k + @go tool pprof -svg ./tsdb benchout/cpu.prof > benchout/cpuprof.svg + @go tool pprof -svg ./tsdb benchout/mem.prof > benchout/memprof.svg + @go tool pprof -svg ./tsdb benchout/block.prof > benchout/blockprof.svg diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 279ce4460..6edb483a4 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -17,7 +17,6 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/storage/local" "github.com/spf13/cobra" ) @@ -49,10 +48,9 @@ func NewBenchCommand() *cobra.Command { type writeBenchmark struct { outPath string cleanup bool - engine string numMetrics int - storage benchmarkStorage + storage *tsdb.DB cpuprof *os.File memprof *os.File @@ -66,7 +64,6 @@ func NewBenchWriteCommand() *cobra.Command { Short: "run a write performance benchmark", Run: wb.run, } - c.PersistentFlags().StringVar(&wb.engine, "engine", "tsdb", "the storage engine to use") c.PersistentFlags().StringVar(&wb.outPath, "out", "benchout/", "set the output path") c.PersistentFlags().IntVar(&wb.numMetrics, "metrics", 10000, "number of metrics to read") return c @@ -93,22 +90,12 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dir := filepath.Join(b.outPath, "storage") - switch b.engine { - case "tsdb": - st, err := newTSDBStorage(dir) - if err != nil { - exitWithError(err) - } - b.storage = st - case "default": - st, err := newDefaultStorage(dir) - if err != nil { - exitWithError(err) - } - b.storage = st - default: - exitWithError(fmt.Errorf("unknown storage engine %q", b.engine)) + st, err := tsdb.Open(dir, nil, nil) + if err != nil { + exitWithError(err) } + b.storage = st + var metrics []model.Metric measureTime("readData", func() { @@ -138,7 +125,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { } }) measureTime("stopStorage", func() { - if err := b.storage.stop(); err != nil { + if err := b.storage.Close(); err != nil { exitWithError(err) } b.stopProfiling() @@ -171,7 +158,7 @@ func (b *writeBenchmark) ingestScrapes(metrics []model.Metric, scrapeCount int) } func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount int) error { - var sc tsdb.Vector + app := b.storage.Appender() ts := int64(model.Now()) type sample struct { @@ -196,82 +183,18 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount for i := 0; i < scrapeCount; i++ { ts += int64(30000) - sc.Reset() for _, s := range scrape { s.value += 1000 - sc.Add(s.labels, float64(s.value)) + app.Add(s.labels, ts, float64(s.value)) } - if err := b.storage.ingestScrape(ts, &sc); err != nil { + if err := app.Commit(); err != nil { return err } } return nil } -type benchmarkStorage interface { - ingestScrape(int64, *tsdb.Vector) error - stop() error -} - -type tsdbStorage struct { - c *tsdb.DB -} - -func (c *tsdbStorage) stop() error { - return c.c.Close() -} - -func (c *tsdbStorage) ingestScrape(ts int64, s *tsdb.Vector) error { - return c.c.AppendVector(ts, s) -} - -func newTSDBStorage(path string) (*tsdbStorage, error) { - c, err := tsdb.Open(path, nil, nil) - if err != nil { - return nil, err - } - return &tsdbStorage{ - c: c, - }, nil -} - -type defaultStorage struct { - s local.Storage -} - -func (s *defaultStorage) ingestScrape(ts int64, scrape *tsdb.Vector) error { - for _, samples := range scrape.Buckets { - for _, smpl := range samples { - met := make(model.Metric, len(smpl.Labels)) - for _, l := range smpl.Labels { - met[model.LabelName(l.Name)] = model.LabelValue(l.Value) - } - if err := s.s.Append(&model.Sample{ - Metric: met, - Timestamp: model.Time(ts), - Value: model.SampleValue(smpl.Value), - }); err != nil { - return err - } - } - } - return nil -} - -func (s *defaultStorage) stop() error { - return s.s.Stop() -} - -func newDefaultStorage(path string) (*defaultStorage, error) { - s := local.NewMemorySeriesStorage(&local.MemorySeriesStorageOptions{ - PersistenceStoragePath: path, - SyncStrategy: local.Adaptive, - CheckpointInterval: 5 * time.Minute, - }) - return &defaultStorage{s: s}, s.Start() -} - func (b *writeBenchmark) startProfiling() { var err error diff --git a/db.go b/db.go index 7613bb3a0..e07ecb09a 100644 --- a/db.go +++ b/db.go @@ -42,7 +42,7 @@ type DB struct { // TODO(fabxc): make configurable const ( - shardShift = 0 + shardShift = 3 numShards = 1 << shardShift maxChunkSize = 1024 ) @@ -103,99 +103,70 @@ func (db *DB) Close() error { return g.Wait() } -// Appender adds a batch of samples. +// Appender allows committing batches of samples to a database. +// The data held by the appender is reset after Commit returns. type Appender interface { - // Add adds a sample pair to the appended batch. - Add(l Labels, t int64, v float64) + // AddSeries registers a new known series label set with the appender + // and returns a reference number used to add samples to it over the + // life time of the Appender. + // AddSeries(Labels) uint64 - // Commit submits the collected samples. + // Add adds a sample pair for the referenced series. + Add(lset Labels, t int64, v float64) + + // Commit submits the collected samples and purges the batch. Commit() error } -// Vector is a set of LabelSet associated with one value each. -// Label sets and values must have equal length. -type Vector struct { - Buckets map[uint16][]Sample - reused int -} - -type Sample struct { - Hash uint64 - Labels Labels - Value float64 -} - -// Reset the vector but keep resources allocated. -func (v *Vector) Reset() { - // Do a full reset every n-th reusage to avoid memory leaks. - if v.Buckets == nil || v.reused > 100 { - v.Buckets = make(map[uint16][]Sample, 0) - return +// Appender returns a new appender against the database. +func (db *DB) Appender() Appender { + return &bucketAppender{ + db: db, + buckets: make([][]hashedSample, numShards), } - for x, bkt := range v.Buckets { - v.Buckets[x] = bkt[:0] - } - v.reused++ } -// Add a sample to the vector. -func (v *Vector) Add(lset Labels, val float64) { +type bucketAppender struct { + db *DB + buckets [][]hashedSample +} + +func (ba *bucketAppender) Add(lset Labels, t int64, v float64) { h := lset.Hash() - s := uint16(h >> (64 - shardShift)) + s := h >> (64 - shardShift) - v.Buckets[s] = append(v.Buckets[s], Sample{ - Hash: h, - Labels: lset, - Value: val, + ba.buckets[s] = append(ba.buckets[s], hashedSample{ + hash: h, + labels: lset, + t: t, + v: v, }) } -// func (db *DB) Appender() Appender { -// return &bucketAppender{ -// samples: make([]Sample, 1024), -// } -// } - -// type bucketAppender struct { -// db *DB -// // buckets []Sam -// } - -// func (a *bucketAppender) Add(l Labels, t int64, v float64) { - -// } - -// func (a *bucketAppender) Commit() error { -// // f -// } - -// AppendVector adds values for a list of label sets for the given timestamp -// in milliseconds. -func (db *DB) AppendVector(ts int64, v *Vector) error { - // Sequentially add samples to shards. - for s, bkt := range v.Buckets { - shard := db.shards[s] - if err := shard.appendBatch(ts, bkt); err != nil { - // TODO(fabxc): handle gracefully and collect multi-error. - return err - } +func (ba *bucketAppender) reset() { + for i := range ba.buckets { + ba.buckets[i] = ba.buckets[i][:0] } - - return nil } -func (db *DB) AppendSingle(lset Labels, ts int64, v float64) error { - sort.Sort(lset) - h := lset.Hash() - s := uint16(h >> (64 - shardShift)) +func (ba *bucketAppender) Commit() error { + defer ba.reset() - return db.shards[s].appendBatch(ts, []Sample{ - { - Hash: h, - Labels: lset, - Value: v, - }, - }) + var merr MultiError + + // Spill buckets into shards. + for s, b := range ba.buckets { + merr.Add(ba.db.shards[s].appendBatch(b)) + } + return merr.Err() +} + +type hashedSample struct { + hash uint64 + labels Labels + + t int64 + v float64 } const sep = '\xff' @@ -253,22 +224,14 @@ func (s *Shard) Close() error { return e.Err() } -func (s *Shard) appendBatch(ts int64, samples []Sample) error { - // TODO(fabxc): make configurable. - const persistenceTimeThreshold = 1000 * 60 * 60 // 1 hour if timestamp in ms - +func (s *Shard) appendBatch(samples []hashedSample) error { s.mtx.Lock() defer s.mtx.Unlock() - for _, smpl := range samples { - if err := s.head.append(smpl.Hash, smpl.Labels, ts, smpl.Value); err != nil { - // TODO(fabxc): handle gracefully and collect multi-error. - return err - } - } + var merr MultiError - if ts > s.head.stats.MaxTime { - s.head.stats.MaxTime = ts + for _, sm := range samples { + merr.Add(s.head.append(sm.hash, sm.labels, sm.t, sm.v)) } // TODO(fabxc): randomize over time @@ -284,7 +247,7 @@ func (s *Shard) appendBatch(ts int64, samples []Sample) error { } } - return nil + return merr.Err() } func intervalOverlap(amin, amax, bmin, bmax int64) bool { @@ -320,8 +283,6 @@ func (s *Shard) blocksForInterval(mint, maxt int64) []block { bs = append(bs, s.head) } - fmt.Println("blocks for interval", bs) - return bs }