From 4590b61343d827600b8797bcc6b3d25aca2d19ed Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 6 Jan 2017 08:08:02 +0100 Subject: [PATCH] Rename shard to partition --- compact.go | 26 ++++++------ db.go | 110 ++++++++++++++++++++++++------------------------ querier.go | 64 ++++++++++++++-------------- querier_test.go | 6 +-- 4 files changed, 103 insertions(+), 103 deletions(-) diff --git a/compact.go b/compact.go index 9c2103d47..5cbcd8857 100644 --- a/compact.go +++ b/compact.go @@ -31,31 +31,31 @@ type compactorMetrics struct { } func newCompactorMetrics(i int) *compactorMetrics { - shardLabel := prometheus.Labels{ - "shard": fmt.Sprintf("%d", i), + partitionLabel := prometheus.Labels{ + "partition": fmt.Sprintf("%d", i), } m := &compactorMetrics{} m.triggered = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_shard_compactions_triggered_total", - Help: "Total number of triggered compactions for the shard.", - ConstLabels: shardLabel, + Name: "tsdb_partition_compactions_triggered_total", + Help: "Total number of triggered compactions for the partition.", + ConstLabels: partitionLabel, }) m.ran = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_shard_compactions_total", - Help: "Total number of compactions that were executed for the shard.", - ConstLabels: shardLabel, + Name: "tsdb_partition_compactions_total", + Help: "Total number of compactions that were executed for the partition.", + ConstLabels: partitionLabel, }) m.failed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_shard_compactions_failed_total", - Help: "Total number of compactions that failed for the shard.", - ConstLabels: shardLabel, + Name: "tsdb_partition_compactions_failed_total", + Help: "Total number of compactions that failed for the partition.", + ConstLabels: partitionLabel, }) m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_shard_compaction_duration", + Name: "tsdb_partition_compaction_duration", Help: "Duration of compaction runs.", - ConstLabels: shardLabel, + ConstLabels: partitionLabel, }) return m diff --git a/db.go b/db.go index 6b7703806..ab46d141b 100644 --- a/db.go +++ b/db.go @@ -41,14 +41,14 @@ type DB struct { opts *Options path string - shards []*Shard + partitions []*Partition } // TODO(fabxc): make configurable const ( - shardShift = 0 - numShards = 1 << shardShift - maxChunkSize = 1024 + partitionShift = 0 + numPartitions = 1 << partitionShift + maxChunkSize = 1024 ) // Open or create a new DB. @@ -70,25 +70,25 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { path: path, } - // Initialize vertical shards. - // TODO(fabxc): validate shard number to be power of 2, which is required - // for the bitshift-modulo when finding the right shard. - for i := 0; i < numShards; i++ { - l := log.NewContext(l).With("shard", i) - d := shardDir(path, i) + // Initialize vertical partitions. + // TODO(fabxc): validate partition number to be power of 2, which is required + // for the bitshift-modulo when finding the right partition. + for i := 0; i < numPartitions; i++ { + l := log.NewContext(l).With("partition", i) + d := partitionDir(path, i) - s, err := OpenShard(d, i, l) + s, err := OpenPartition(d, i, l) if err != nil { - return nil, fmt.Errorf("initializing shard %q failed: %s", d, err) + return nil, fmt.Errorf("initializing partition %q failed: %s", d, err) } - c.shards = append(c.shards, s) + c.partitions = append(c.partitions, s) } return c, nil } -func shardDir(base string, i int) string { +func partitionDir(base string, i int) string { return filepath.Join(base, strconv.Itoa(i)) } @@ -96,8 +96,8 @@ func shardDir(base string, i int) string { func (db *DB) Close() error { var g errgroup.Group - for _, shard := range db.shards { - g.Go(shard.Close) + for _, partition := range db.partitions { + g.Go(partition.Close) } return g.Wait() @@ -122,7 +122,7 @@ type Appender interface { func (db *DB) Appender() Appender { return &bucketAppender{ db: db, - buckets: make([][]hashedSample, numShards), + buckets: make([][]hashedSample, numPartitions), } } @@ -133,7 +133,7 @@ type bucketAppender struct { func (ba *bucketAppender) Add(lset labels.Labels, t int64, v float64) error { h := lset.Hash() - s := h >> (64 - shardShift) + s := h >> (64 - partitionShift) ba.buckets[s] = append(ba.buckets[s], hashedSample{ hash: h, @@ -156,9 +156,9 @@ func (ba *bucketAppender) Commit() error { var merr MultiError - // Spill buckets into shards. + // Spill buckets into partitions. for s, b := range ba.buckets { - merr.Add(ba.db.shards[s].appendBatch(b)) + merr.Add(ba.db.partitions[s].appendBatch(b)) } return merr.Err() } @@ -174,12 +174,12 @@ type hashedSample struct { const sep = '\xff' -// Shard handles reads and writes of time series falling into -// a hashed shard of a series. -type Shard struct { +// Partition handles reads and writes of time series falling into +// a hashed partition of a series. +type Partition struct { path string logger log.Logger - metrics *shardMetrics + metrics *partitionMetrics mtx sync.RWMutex persisted []*persistedBlock @@ -190,34 +190,34 @@ type Shard struct { cutc chan struct{} } -type shardMetrics struct { +type partitionMetrics struct { persistences prometheus.Counter persistenceDuration prometheus.Histogram samplesAppended prometheus.Counter } -func newShardMetrics(r prometheus.Registerer, i int) *shardMetrics { - shardLabel := prometheus.Labels{ - "shard": fmt.Sprintf("%d", i), +func newPartitionMetrics(r prometheus.Registerer, i int) *partitionMetrics { + partitionLabel := prometheus.Labels{ + "partition": fmt.Sprintf("%d", i), } - m := &shardMetrics{} + m := &partitionMetrics{} m.persistences = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_shard_persistences_total", + Name: "tsdb_partition_persistences_total", Help: "Total number of head persistances that ran so far.", - ConstLabels: shardLabel, + ConstLabels: partitionLabel, }) m.persistenceDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "tsdb_shard_persistence_duration_seconds", + Name: "tsdb_partition_persistence_duration_seconds", Help: "Duration of persistences in seconds.", - ConstLabels: shardLabel, + ConstLabels: partitionLabel, Buckets: prometheus.ExponentialBuckets(0.25, 2, 5), }) m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "tsdb_shard_samples_appended_total", - Help: "Total number of appended samples for the shard.", - ConstLabels: shardLabel, + Name: "tsdb_partition_samples_appended_total", + Help: "Total number of appended samples for the partition.", + ConstLabels: partitionLabel, }) if r != nil { @@ -230,9 +230,9 @@ func newShardMetrics(r prometheus.Registerer, i int) *shardMetrics { return m } -// OpenShard returns a new Shard. -func OpenShard(path string, i int, logger log.Logger) (*Shard, error) { - // Create directory if shard is new. +// OpenPartition returns a new Partition. +func OpenPartition(path string, i int, logger log.Logger) (*Partition, error) { + // Create directory if partition is new. if _, err := os.Stat(path); os.IsNotExist(err) { if err := os.MkdirAll(path, 0777); err != nil { return nil, err @@ -258,10 +258,10 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) { heads = []*HeadBlock{head} } - s := &Shard{ + s := &Partition{ path: path, logger: logger, - metrics: newShardMetrics(nil, i), + metrics: newPartitionMetrics(nil, i), heads: heads, persisted: persisted, cutc: make(chan struct{}, 1), @@ -275,7 +275,7 @@ func OpenShard(path string, i int, logger log.Logger) (*Shard, error) { return s, nil } -func (s *Shard) run() { +func (s *Partition) run() { for range s.cutc { // if err := s.cut(); err != nil { // s.logger.Log("msg", "cut error", "err", err) @@ -296,8 +296,8 @@ func (s *Shard) run() { close(s.donec) } -// Close the shard. -func (s *Shard) Close() error { +// Close the partition. +func (s *Partition) Close() error { close(s.cutc) <-s.donec @@ -317,7 +317,7 @@ func (s *Shard) Close() error { return merr.Err() } -func (s *Shard) appendBatch(samples []hashedSample) error { +func (s *Partition) appendBatch(samples []hashedSample) error { if len(samples) == 0 { return nil } @@ -344,11 +344,11 @@ func (s *Shard) appendBatch(samples []hashedSample) error { return err } -func (s *Shard) lock() sync.Locker { +func (s *Partition) lock() sync.Locker { return &s.mtx } -func (s *Shard) headForDir(dir string) (int, bool) { +func (s *Partition) headForDir(dir string) (int, bool) { for i, b := range s.heads { if b.dir() == dir { return i, true @@ -357,7 +357,7 @@ func (s *Shard) headForDir(dir string) (int, bool) { return -1, false } -func (s *Shard) persistedForDir(dir string) (int, bool) { +func (s *Partition) persistedForDir(dir string) (int, bool) { for i, b := range s.persisted { if b.dir() == dir { return i, true @@ -366,7 +366,7 @@ func (s *Shard) persistedForDir(dir string) (int, bool) { return -1, false } -func (s *Shard) reinit(dir string) error { +func (s *Partition) reinit(dir string) error { if !fileutil.Exist(dir) { if i, ok := s.headForDir(dir); ok { if err := s.heads[i].Close(); err != nil { @@ -410,7 +410,7 @@ func (s *Shard) reinit(dir string) error { return nil } -func (s *Shard) compactable() []block { +func (s *Partition) compactable() []block { var blocks []block for _, pb := range s.persisted { blocks = append([]block{pb}, blocks...) @@ -444,9 +444,9 @@ func intervalContains(min, max, t int64) bool { return t >= min && t <= max } -// blocksForRange returns all blocks within the shard that may contain +// blocksForRange returns all blocks within the partition that may contain // data for the given time range. -func (s *Shard) blocksForInterval(mint, maxt int64) []block { +func (s *Partition) blocksForInterval(mint, maxt int64) []block { var bs []block for _, b := range s.persisted { @@ -472,7 +472,7 @@ const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (s *Shard) cut() error { +func (s *Partition) cut() error { // Set new head block. head := s.heads[len(s.heads)-1] @@ -487,7 +487,7 @@ func (s *Shard) cut() error { return nil } -// func (s *Shard) persist() error { +// func (s *Partition) persist() error { // s.mtx.Lock() // // Set new head block. @@ -501,7 +501,7 @@ func (s *Shard) cut() error { // s.mtx.Unlock() -// // TODO(fabxc): add grace period where we can still append to old head shard +// // TODO(fabxc): add grace period where we can still append to old head partition // // before actually persisting it. // dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime)) diff --git a/querier.go b/querier.go index 587c941cb..0e0c25822 100644 --- a/querier.go +++ b/querier.go @@ -35,10 +35,10 @@ type Series interface { Iterator() SeriesIterator } -// querier merges query results from a set of shard querieres. +// querier merges query results from a set of partition querieres. type querier struct { mint, maxt int64 - shards []Querier + partitions []Querier } // Querier returns a new querier over the database for the given @@ -48,19 +48,19 @@ func (db *DB) Querier(mint, maxt int64) Querier { mint: mint, maxt: maxt, } - for _, s := range db.shards { - q.shards = append(q.shards, s.Querier(mint, maxt)) + for _, s := range db.partitions { + q.partitions = append(q.partitions, s.Querier(mint, maxt)) } return q } func (q *querier) Select(ms ...labels.Matcher) SeriesSet { - // We gather the non-overlapping series from every shard and simply + // We gather the non-overlapping series from every partition and simply // return their union. r := &mergedSeriesSet{} - for _, s := range q.shards { + for _, s := range q.partitions { r.sets = append(r.sets, s.Select(ms...)) } if len(r.sets) == 0 { @@ -70,11 +70,11 @@ func (q *querier) Select(ms ...labels.Matcher) SeriesSet { } func (q *querier) LabelValues(n string) ([]string, error) { - res, err := q.shards[0].LabelValues(n) + res, err := q.partitions[0].LabelValues(n) if err != nil { return nil, err } - for _, sq := range q.shards[1:] { + for _, sq := range q.partitions[1:] { pr, err := sq.LabelValues(n) if err != nil { return nil, err @@ -120,29 +120,29 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { func (q *querier) Close() error { var merr MultiError - for _, sq := range q.shards { + for _, sq := range q.partitions { merr.Add(sq.Close()) } return merr.Err() } -// shardQuerier aggregates querying results from time blocks within -// a single shard. -type shardQuerier struct { - shard *Shard - blocks []Querier +// partitionQuerier aggregates querying results from time blocks within +// a single partition. +type partitionQuerier struct { + partition *Partition + blocks []Querier } -// Querier returns a new querier over the data shard for the given +// Querier returns a new querier over the data partition for the given // time range. -func (s *Shard) Querier(mint, maxt int64) Querier { +func (s *Partition) Querier(mint, maxt int64) Querier { s.mtx.RLock() blocks := s.blocksForInterval(mint, maxt) - sq := &shardQuerier{ - blocks: make([]Querier, 0, len(blocks)), - shard: s, + sq := &partitionQuerier{ + blocks: make([]Querier, 0, len(blocks)), + partition: s, } for _, b := range blocks { @@ -163,7 +163,7 @@ func (s *Shard) Querier(mint, maxt int64) Querier { return sq } -func (q *shardQuerier) LabelValues(n string) ([]string, error) { +func (q *partitionQuerier) LabelValues(n string) ([]string, error) { res, err := q.blocks[0].LabelValues(n) if err != nil { return nil, err @@ -179,11 +179,11 @@ func (q *shardQuerier) LabelValues(n string) ([]string, error) { return res, nil } -func (q *shardQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { +func (q *partitionQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { return nil, fmt.Errorf("not implemented") } -func (q *shardQuerier) Select(ms ...labels.Matcher) SeriesSet { +func (q *partitionQuerier) Select(ms ...labels.Matcher) SeriesSet { // Sets from different blocks have no time overlap. The reference numbers // they emit point to series sorted in lexicographic order. // We can fully connect partial series by simply comparing with the previous @@ -194,18 +194,18 @@ func (q *shardQuerier) Select(ms ...labels.Matcher) SeriesSet { r := q.blocks[0].Select(ms...) for _, s := range q.blocks[1:] { - r = newShardSeriesSet(r, s.Select(ms...)) + r = newPartitionSeriesSet(r, s.Select(ms...)) } return r } -func (q *shardQuerier) Close() error { +func (q *partitionQuerier) Close() error { var merr MultiError for _, bq := range q.blocks { merr.Add(bq.Close()) } - q.shard.mtx.RUnlock() + q.partition.mtx.RUnlock() return merr.Err() } @@ -359,15 +359,15 @@ func (s *mergedSeriesSet) Next() bool { return s.Next() } -type shardSeriesSet struct { +type partitionSeriesSet struct { a, b SeriesSet cur Series adone, bdone bool } -func newShardSeriesSet(a, b SeriesSet) *shardSeriesSet { - s := &shardSeriesSet{a: a, b: b} +func newPartitionSeriesSet(a, b SeriesSet) *partitionSeriesSet { + s := &partitionSeriesSet{a: a, b: b} // Initialize first elements of both sets as Next() needs // one element look-ahead. s.adone = !s.a.Next() @@ -376,18 +376,18 @@ func newShardSeriesSet(a, b SeriesSet) *shardSeriesSet { return s } -func (s *shardSeriesSet) At() Series { +func (s *partitionSeriesSet) At() Series { return s.cur } -func (s *shardSeriesSet) Err() error { +func (s *partitionSeriesSet) Err() error { if s.a.Err() != nil { return s.a.Err() } return s.b.Err() } -func (s *shardSeriesSet) compare() int { +func (s *partitionSeriesSet) compare() int { if s.adone { return 1 } @@ -397,7 +397,7 @@ func (s *shardSeriesSet) compare() int { return labels.Compare(s.a.At().Labels(), s.b.At().Labels()) } -func (s *shardSeriesSet) Next() bool { +func (s *partitionSeriesSet) Next() bool { if s.adone && s.bdone || s.Err() != nil { return false } diff --git a/querier_test.go b/querier_test.go index 7819f9a6c..8d8609406 100644 --- a/querier_test.go +++ b/querier_test.go @@ -65,7 +65,7 @@ func (it *listSeriesIterator) Err() error { return nil } -func TestShardSeriesSet(t *testing.T) { +func TestPartitionSeriesSet(t *testing.T) { newSeries := func(l map[string]string, s []sample) Series { return &mockSeries{ labels: func() labels.Labels { return labels.FromMap(l) }, @@ -77,7 +77,7 @@ func TestShardSeriesSet(t *testing.T) { // The input sets in order (samples in series in b are strictly // after those in a). a, b SeriesSet - // The composition of a and b in the shard series set must yield + // The composition of a and b in the partition series set must yield // results equivalent to the result series set. exp SeriesSet }{ @@ -170,7 +170,7 @@ func TestShardSeriesSet(t *testing.T) { Outer: for _, c := range cases { - res := newShardSeriesSet(c.a, c.b) + res := newPartitionSeriesSet(c.a, c.b) for { eok, rok := c.exp.Next(), res.Next()