From 3ed2c2a14bc9c67e151f193185654b500f361ce5 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 6 Jan 2017 11:40:09 +0100 Subject: [PATCH] Rename Partition to regular DB, DB to PartitionedDB --- cmd/tsdb/main.go | 4 +- compact.go | 38 ++--- db.go | 385 +++++++++++++++++++++++------------------------ querier.go | 8 +- 4 files changed, 215 insertions(+), 220 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 705c09d0d..034db8a32 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -51,7 +51,7 @@ type writeBenchmark struct { cleanup bool numMetrics int - storage *tsdb.DB + storage *tsdb.PartitionedDB cpuprof *os.File memprof *os.File @@ -91,7 +91,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dir := filepath.Join(b.outPath, "storage") - st, err := tsdb.Open(dir, nil, nil) + st, err := tsdb.OpenPartitioned(dir, 1, nil, nil) if err != nil { exitWithError(err) } diff --git a/compact.go b/compact.go index 5cbcd8857..4f3838c02 100644 --- a/compact.go +++ b/compact.go @@ -30,34 +30,34 @@ type compactorMetrics struct { duration prometheus.Histogram } -func newCompactorMetrics(i int) *compactorMetrics { - partitionLabel := prometheus.Labels{ - "partition": fmt.Sprintf("%d", i), - } - +func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { m := &compactorMetrics{} m.triggered = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_partition_compactions_triggered_total", - Help: "Total number of triggered compactions for the partition.", - ConstLabels: partitionLabel, + Name: "tsdb_compactions_triggered_total", + Help: "Total number of triggered compactions for the partition.", }) m.ran = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_partition_compactions_total", - Help: "Total number of compactions that were executed for the partition.", - ConstLabels: partitionLabel, + Name: "tsdb_compactions_total", + Help: "Total number of compactions that were executed for the partition.", }) m.failed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_partition_compactions_failed_total", - Help: "Total number of compactions that failed for the partition.", - ConstLabels: partitionLabel, + Name: "tsdb_compactions_failed_total", + Help: "Total number of compactions that failed for the partition.", }) m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_partition_compaction_duration", - Help: "Duration of compaction runs.", - ConstLabels: partitionLabel, + Name: "tsdb_compaction_duration", + Help: "Duration of compaction runs.", }) + if r != nil { + r.MustRegister( + m.triggered, + m.ran, + m.failed, + m.duration, + ) + } return m } @@ -67,13 +67,13 @@ type compactableBlocks interface { reinit(dir string) error } -func newCompactor(i int, blocks compactableBlocks, l log.Logger) (*compactor, error) { +func newCompactor(blocks compactableBlocks, l log.Logger) (*compactor, error) { c := &compactor{ triggerc: make(chan struct{}, 1), donec: make(chan struct{}), logger: l, blocks: blocks, - metrics: newCompactorMetrics(i), + metrics: newCompactorMetrics(nil), } go c.run() diff --git a/db.go b/db.go index 9d02a13f1..eff94456d 100644 --- a/db.go +++ b/db.go @@ -24,7 +24,7 @@ import ( ) // DefaultOptions used for the DB. They are sane for setups using -// millisecond precision timestamps. +// millisecond precision timestampdb. var DefaultOptions = &Options{ Retention: 15 * 24 * 3600 * 1000, // 15 days DisableWAL: false, @@ -36,134 +36,21 @@ type Options struct { DisableWAL bool } -// DB is a time series storage. -type DB struct { - logger log.Logger - opts *Options - path string - - partitions []*Partition -} - -// TODO(fabxc): make configurable -const ( - partitionShift = 0 - numPartitions = 1 << partitionShift - maxChunkSize = 1024 -) - -// Open or create a new DB. -func Open(path string, l log.Logger, opts *Options) (*DB, error) { - if opts == nil { - opts = DefaultOptions - } - if err := os.MkdirAll(path, 0777); err != nil { - return nil, err - } - if l == nil { - l = log.NewLogfmtLogger(os.Stdout) - l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) - } - - c := &DB{ - logger: l, - opts: opts, - path: path, - } - - // Initialize vertical partitions. - // TODO(fabxc): validate partition number to be power of 2, which is required - // for the bitshift-modulo when finding the right partition. - for i := 0; i < numPartitions; i++ { - l := log.NewContext(l).With("partition", i) - d := partitionDir(path, i) - - s, err := OpenPartition(d, i, l) - if err != nil { - return nil, fmt.Errorf("initializing partition %q failed: %s", d, err) - } - - c.partitions = append(c.partitions, s) - } - - return c, nil -} - -func partitionDir(base string, i int) string { - return filepath.Join(base, strconv.Itoa(i)) -} - -// Close the database. -func (db *DB) Close() error { - var g errgroup.Group - - for _, partition := range db.partitions { - g.Go(partition.Close) - } - - return g.Wait() -} - // Appender allows committing batches of samples to a database. -// The data held by the appender is reset after Commit returns. +// The data held by the appender is reset after Commit returndb. type Appender interface { // AddSeries registers a new known series label set with the appender // and returns a reference number used to add samples to it over the // life time of the Appender. // AddSeries(Labels) uint64 - // Add adds a sample pair for the referenced series. + // Add adds a sample pair for the referenced seriedb. Add(lset labels.Labels, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error } -// Appender returns a new appender against the database. -func (db *DB) Appender() Appender { - return &bucketAppender{ - db: db, - buckets: make([][]hashedSample, numPartitions), - } -} - -type bucketAppender struct { - db *DB - buckets [][]hashedSample -} - -func (ba *bucketAppender) Add(lset labels.Labels, t int64, v float64) error { - h := lset.Hash() - s := h >> (64 - partitionShift) - - ba.buckets[s] = append(ba.buckets[s], hashedSample{ - hash: h, - labels: lset, - t: t, - v: v, - }) - - return nil -} - -func (ba *bucketAppender) reset() { - for i := range ba.buckets { - ba.buckets[i] = ba.buckets[i][:0] - } -} - -func (ba *bucketAppender) Commit() error { - defer ba.reset() - - var merr MultiError - - // Spill buckets into partitions. - for s, b := range ba.buckets { - merr.Add(ba.db.partitions[s].appendBatch(b)) - } - return merr.Err() -} - type hashedSample struct { hash uint64 labels labels.Labels @@ -175,12 +62,12 @@ type hashedSample struct { const sep = '\xff' -// Partition handles reads and writes of time series falling into -// a hashed partition of a series. -type Partition struct { +// DB handles reads and writes of time series falling into +// a hashed partition of a seriedb. +type DB struct { dir string logger log.Logger - metrics *partitionMetrics + metrics *dbMetrics mtx sync.RWMutex persisted []*persistedBlock @@ -188,34 +75,27 @@ type Partition struct { compactor *compactor } -type partitionMetrics struct { +type dbMetrics struct { persistences prometheus.Counter persistenceDuration prometheus.Histogram samplesAppended prometheus.Counter } -func newPartitionMetrics(r prometheus.Registerer, i int) *partitionMetrics { - partitionLabel := prometheus.Labels{ - "partition": fmt.Sprintf("%d", i), - } - - m := &partitionMetrics{} +func newDBMetrics(r prometheus.Registerer) *dbMetrics { + m := &dbMetrics{} m.persistences = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_partition_persistences_total", - Help: "Total number of head persistances that ran so far.", - ConstLabels: partitionLabel, + Name: "tsdb_persistences_total", + Help: "Total number of head persistances that ran so far.", }) m.persistenceDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_partition_persistence_duration_seconds", - Help: "Duration of persistences in seconds.", - ConstLabels: partitionLabel, - Buckets: prometheus.ExponentialBuckets(0.25, 2, 5), + Name: "tsdb_persistence_duration_seconds", + Help: "Duration of persistences in seconddb.", + Buckets: prometheus.ExponentialBuckets(0.25, 2, 5), }) m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_partition_samples_appended_total", - Help: "Total number of appended samples for the partition.", - ConstLabels: partitionLabel, + Name: "tsdb_samples_appended_total", + Help: "Total number of appended sampledb.", }) if r != nil { @@ -228,8 +108,8 @@ func newPartitionMetrics(r prometheus.Registerer, i int) *partitionMetrics { return m } -// OpenPartition returns a new Partition. -func OpenPartition(dir string, i int, logger log.Logger) (p *Partition, err error) { +// Open returns a new DB in the given directory. +func Open(dir string, logger log.Logger) (p *DB, err error) { // Create directory if partition is new. if !fileutil.Exist(dir) { if err := os.MkdirAll(dir, 0777); err != nil { @@ -237,15 +117,15 @@ func OpenPartition(dir string, i int, logger log.Logger) (p *Partition, err erro } } - p = &Partition{ + p = &DB{ dir: dir, logger: logger, - metrics: newPartitionMetrics(nil, i), + metrics: newDBMetrics(nil), } if err := p.initBlocks(); err != nil { return nil, err } - if p.compactor, err = newCompactor(i, p, logger); err != nil { + if p.compactor, err = newCompactor(p, logger); err != nil { return nil, err } @@ -262,13 +142,13 @@ func isBlockDir(fi os.FileInfo) bool { return true } -func (p *Partition) initBlocks() error { +func (db *DB) initBlocks() error { var ( pbs []*persistedBlock heads []*HeadBlock ) - files, err := ioutil.ReadDir(p.dir) + files, err := ioutil.ReadDir(db.dir) if err != nil { return err } @@ -277,7 +157,7 @@ func (p *Partition) initBlocks() error { if !isBlockDir(fi) { continue } - dir := filepath.Join(p.dir, fi.Name()) + dir := filepath.Join(db.dir, fi.Name()) if fileutil.Exist(filepath.Join(dir, walFileName)) { h, err := OpenHeadBlock(dir) @@ -311,68 +191,68 @@ func (p *Partition) initBlocks() error { lastTime = b.stats().MaxTime } - p.persisted = pbs - p.heads = heads + db.persisted = pbs + db.heads = heads if len(heads) == 0 { - return p.cut() + return db.cut() } return nil } // Close the partition. -func (p *Partition) Close() error { +func (db *DB) Close() error { var merr MultiError - merr.Add(p.compactor.Close()) + merr.Add(db.compactor.Close()) - p.mtx.Lock() - defer p.mtx.Unlock() + db.mtx.Lock() + defer db.mtx.Unlock() - for _, pb := range p.persisted { + for _, pb := range db.persisted { merr.Add(pb.Close()) } - for _, hb := range p.heads { + for _, hb := range db.heads { merr.Add(hb.Close()) } return merr.Err() } -func (s *Partition) appendBatch(samples []hashedSample) error { +func (db *DB) appendBatch(samples []hashedSample) error { if len(samples) == 0 { return nil } - s.mtx.Lock() - defer s.mtx.Unlock() + db.mtx.Lock() + defer db.mtx.Unlock() - head := s.heads[len(s.heads)-1] + head := db.heads[len(db.heads)-1] // TODO(fabxc): distinguish samples between concurrent heads for // different time blocks. Those may occurr during transition to still // allow late samples to arrive for a previous block. err := head.appendBatch(samples) if err == nil { - s.metrics.samplesAppended.Add(float64(len(samples))) + db.metrics.samplesAppended.Add(float64(len(samples))) } // TODO(fabxc): randomize over time and use better scoring function. if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 250 { - if err := s.cut(); err != nil { - s.logger.Log("msg", "cut failed", "err", err) + if err := db.cut(); err != nil { + db.logger.Log("msg", "cut failed", "err", err) } else { - s.compactor.trigger() + db.compactor.trigger() } } return err } -func (s *Partition) lock() sync.Locker { - return &s.mtx +func (db *DB) lock() sync.Locker { + return &db.mtx } -func (s *Partition) headForDir(dir string) (int, bool) { - for i, b := range s.heads { +func (db *DB) headForDir(dir string) (int, bool) { + for i, b := range db.heads { if b.dir() == dir { return i, true } @@ -380,8 +260,8 @@ func (s *Partition) headForDir(dir string) (int, bool) { return -1, false } -func (s *Partition) persistedForDir(dir string) (int, bool) { - for i, b := range s.persisted { +func (db *DB) persistedForDir(dir string) (int, bool) { + for i, b := range db.persisted { if b.dir() == dir { return i, true } @@ -389,34 +269,34 @@ func (s *Partition) persistedForDir(dir string) (int, bool) { return -1, false } -func (s *Partition) reinit(dir string) error { +func (db *DB) reinit(dir string) error { if !fileutil.Exist(dir) { - if i, ok := s.headForDir(dir); ok { - if err := s.heads[i].Close(); err != nil { + if i, ok := db.headForDir(dir); ok { + if err := db.heads[i].Close(); err != nil { return err } - s.heads = append(s.heads[:i], s.heads[i+1:]...) + db.heads = append(db.heads[:i], db.heads[i+1:]...) } - if i, ok := s.persistedForDir(dir); ok { - if err := s.persisted[i].Close(); err != nil { + if i, ok := db.persistedForDir(dir); ok { + if err := db.persisted[i].Close(); err != nil { return err } - s.persisted = append(s.persisted[:i], s.persisted[i+1:]...) + db.persisted = append(db.persisted[:i], db.persisted[i+1:]...) } return nil } // Remove a previous head block. - if i, ok := s.headForDir(dir); ok { - if err := s.heads[i].Close(); err != nil { + if i, ok := db.headForDir(dir); ok { + if err := db.heads[i].Close(); err != nil { return err } - s.heads = append(s.heads[:i], s.heads[i+1:]...) + db.heads = append(db.heads[:i], db.heads[i+1:]...) } // Close an old persisted block. - i, ok := s.persistedForDir(dir) + i, ok := db.persistedForDir(dir) if ok { - if err := s.persisted[i].Close(); err != nil { + if err := db.persisted[i].Close(); err != nil { return err } } @@ -425,28 +305,28 @@ func (s *Partition) reinit(dir string) error { return errors.Wrap(err, "open persisted block") } if i >= 0 { - s.persisted[i] = pb + db.persisted[i] = pb } else { - s.persisted = append(s.persisted, pb) + db.persisted = append(db.persisted, pb) } return nil } -func (s *Partition) compactable() []block { +func (db *DB) compactable() []block { var blocks []block - for _, pb := range s.persisted { + for _, pb := range db.persisted { blocks = append([]block{pb}, blocks...) } - // threshold := s.heads[len(s.heads)-1].bstats.MaxTime - headGracePeriod + // threshold := db.heads[len(db.heads)-1].bstatdb.MaxTime - headGracePeriod - // for _, hb := range s.heads { - // if hb.bstats.MaxTime < threshold { + // for _, hb := range db.heads { + // if hb.bstatdb.MaxTime < threshold { // blocks = append(blocks, hb) // } // } - for _, hb := range s.heads[:len(s.heads)-1] { + for _, hb := range db.heads[:len(db.heads)-1] { blocks = append([]block{hb}, blocks...) } @@ -469,17 +349,17 @@ func intervalContains(min, max, t int64) bool { // blocksForInterval returns all blocks within the partition that may contain // data for the given time range. -func (s *Partition) blocksForInterval(mint, maxt int64) []block { +func (db *DB) blocksForInterval(mint, maxt int64) []block { var bs []block - for _, b := range s.persisted { + for _, b := range db.persisted { bmin, bmax := b.interval() if intervalOverlap(mint, maxt, bmin, bmax) { bs = append(bs, b) } } - for _, b := range s.heads { + for _, b := range db.heads { bmin, bmax := b.interval() if intervalOverlap(mint, maxt, bmin, bmax) { @@ -495,7 +375,7 @@ const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (p *Partition) cut() error { +func (p *DB) cut() error { dir, err := p.nextBlockDir() if err != nil { return err @@ -509,7 +389,7 @@ func (p *Partition) cut() error { return nil } -func (p *Partition) nextBlockDir() (string, error) { +func (p *DB) nextBlockDir() (string, error) { names, err := fileutil.ReadDir(p.dir) if err != nil { return "", err @@ -529,13 +409,13 @@ type chunkDesc struct { lset labels.Labels chunk chunks.Chunk - // Caching fields. + // Caching fielddb. firstTimestamp int64 lastTimestamp int64 lastValue float64 numSamples int - app chunks.Appender // Current appender for the chunks. + app chunks.Appender // Current appender for the chunkdb. } func (cd *chunkDesc) append(ts int64, v float64) { @@ -549,6 +429,121 @@ func (cd *chunkDesc) append(ts int64, v float64) { cd.numSamples++ } +// PartitionedDB is a time series storage. +type PartitionedDB struct { + logger log.Logger + opts *Options + dir string + + partitionPow uint + Partitions []*DB +} + +func isPowTwo(x int) bool { + return x > 0 && (x&(x-1)) == 0 +} + +// OpenPartitioned or create a new DB. +func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*PartitionedDB, error) { + if !isPowTwo(n) { + return nil, errors.Errorf("%d is not a power of two", n) + } + if opts == nil { + opts = DefaultOptions + } + if l == nil { + l = log.NewLogfmtLogger(os.Stdout) + l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + } + + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, err + } + c := &PartitionedDB{ + logger: l, + opts: opts, + dir: dir, + partitionPow: uint(math.Log2(float64(n))), + } + + // Initialize vertical partitiondb. + // TODO(fabxc): validate partition number to be power of 2, which is required + // for the bitshift-modulo when finding the right partition. + for i := 0; i < n; i++ { + l := log.NewContext(l).With("partition", i) + d := partitionDir(dir, i) + + s, err := Open(d, l) + if err != nil { + return nil, fmt.Errorf("initializing partition %q failed: %s", d, err) + } + + c.Partitions = append(c.Partitions, s) + } + + return c, nil +} + +func partitionDir(base string, i int) string { + return filepath.Join(base, fmt.Sprintf("p-%0.4d", i)) +} + +// Close the database. +func (db *PartitionedDB) Close() error { + var g errgroup.Group + + for _, partition := range db.Partitions { + g.Go(partition.Close) + } + + return g.Wait() +} + +// Appender returns a new appender against the database. +func (db *PartitionedDB) Appender() Appender { + return &partitionedAppender{ + db: db, + buckets: make([][]hashedSample, len(db.Partitions)), + } +} + +type partitionedAppender struct { + db *PartitionedDB + buckets [][]hashedSample +} + +func (ba *partitionedAppender) Add(lset labels.Labels, t int64, v float64) error { + h := lset.Hash() + s := h >> (64 - ba.db.partitionPow) + + ba.buckets[s] = append(ba.buckets[s], hashedSample{ + hash: h, + labels: lset, + t: t, + v: v, + }) + + return nil +} + +func (ba *partitionedAppender) reset() { + for i := range ba.buckets { + ba.buckets[i] = ba.buckets[i][:0] + } +} + +func (ba *partitionedAppender) Commit() error { + defer ba.reset() + + var merr MultiError + + // Spill buckets into partitiondb. + for s, b := range ba.buckets { + merr.Add(ba.db.Partitions[s].appendBatch(b)) + } + return merr.Err() +} + // The MultiError type implements the error interface, and contains the // Errors used to construct it. type MultiError []error diff --git a/querier.go b/querier.go index 0e0c25822..4c0cca9a1 100644 --- a/querier.go +++ b/querier.go @@ -43,12 +43,12 @@ type querier struct { // Querier returns a new querier over the database for the given // time range. -func (db *DB) Querier(mint, maxt int64) Querier { +func (db *PartitionedDB) Querier(mint, maxt int64) Querier { q := &querier{ mint: mint, maxt: maxt, } - for _, s := range db.partitions { + for _, s := range db.Partitions { q.partitions = append(q.partitions, s.Querier(mint, maxt)) } @@ -129,13 +129,13 @@ func (q *querier) Close() error { // partitionQuerier aggregates querying results from time blocks within // a single partition. type partitionQuerier struct { - partition *Partition + partition *DB blocks []Querier } // Querier returns a new querier over the data partition for the given // time range. -func (s *Partition) Querier(mint, maxt int64) Querier { +func (s *DB) Querier(mint, maxt int64) Querier { s.mtx.RLock() blocks := s.blocksForInterval(mint, maxt)