From 52276c696635f8db36cc547c9dd33e59d08aacb0 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 7 Dec 2016 17:10:49 +0100 Subject: [PATCH] Bucket samples before appending. This pre-sorts samples into buckets before appending them to reduce locking of shards. --- db.go | 52 +++++++++++++++++++++++++++++++++++++++++----------- shard.go | 9 ++++++++- 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/db.go b/db.go index 7753cf171..92dfe03c0 100644 --- a/db.go +++ b/db.go @@ -31,8 +31,9 @@ type DB struct { // TODO(fabxc): make configurable const ( - numSeriesShards = 32 - maxChunkSize = 1024 + seriesShardShift = 3 + numSeriesShards = 1 << seriesShardShift + maxChunkSize = 1024 ) // Open or create a new DB. @@ -63,6 +64,12 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { // Close the database. func (db *DB) Close() error { + for i, shard := range db.shards { + fmt.Println("shard", i) + fmt.Println(" num chunks", len(shard.head.forward)) + fmt.Println(" num samples", shard.head.samples) + } + return fmt.Errorf("not implemented") } @@ -214,26 +221,49 @@ func LabelsFromMap(m map[string]string) Labels { // Vector is a set of LabelSet associated with one value each. // Label sets and values must have equal length. type Vector struct { - LabelSets []Labels - Values []float64 + Buckets map[uint16][]Sample +} + +type Sample struct { + Hash uint64 + Labels Labels + Value float64 +} + +// Reset the vector but keep resources allocated. +func (v *Vector) Reset() { + v.Buckets = make(map[uint16][]Sample, len(v.Buckets)) +} + +// Add a sample to the vector. +func (v *Vector) Add(lset Labels, val float64) { + h := lset.Hash() + s := uint16(h >> (64 - seriesShardShift)) + + v.Buckets[s] = append(v.Buckets[s], Sample{ + Hash: h, + Labels: lset, + Value: val, + }) } // AppendVector adds values for a list of label sets for the given timestamp // in milliseconds. func (db *DB) AppendVector(ts int64, v *Vector) error { // Sequentially add samples to shards. - for i, ls := range v.LabelSets { - h := ls.Hash() - shard := db.shards[h>>(64-uint(len(db.shards)))] + for s, bkt := range v.Buckets { + shard := db.shards[s] // TODO(fabxc): benchmark whether grouping into shards and submitting to // shards in batches is more efficient. shard.head.mtx.Lock() - if err := shard.head.append(h, ls, ts, v.Values[i]); err != nil { - shard.head.mtx.Unlock() - // TODO(fabxc): handle gracefully and collect multi-error. - return err + for _, smpl := range bkt { + if err := shard.head.append(smpl.Hash, smpl.Labels, ts, smpl.Value); err != nil { + shard.head.mtx.Unlock() + // TODO(fabxc): handle gracefully and collect multi-error. + return err + } } shard.head.mtx.Unlock() } diff --git a/shard.go b/shard.go index 9ad62fd23..e8f36810f 100644 --- a/shard.go +++ b/shard.go @@ -40,6 +40,8 @@ type HeadBlock struct { forward map[uint32]*chunkDesc // chunk ID to chunk desc values map[string][]string // label names to possible values index *memIndex // inverted index for label pairs + + samples uint64 } // Block handles reads against a completed block of time series data within a time window. @@ -97,7 +99,12 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error // Store forward index for the returned ID. h.forward[id] = chkd } - return chkd.append(ts, v) + if err := chkd.append(ts, v); err != nil { + return err + } + + h.samples++ + return nil } // chunkDesc wraps a plain data chunk and provides cached meta data about it.