diff --git a/block.go b/block.go index d24198a68..ef6d71a73 100644 --- a/block.go +++ b/block.go @@ -19,12 +19,11 @@ const ( type Block interface{} type block interface { - stats() *seriesStats + stats() *blockStats seriesData() seriesDataIterator } type persistedBlock struct { - } type seriesDataIterator interface { @@ -50,7 +49,7 @@ const ( const ( seriesMetaSize = int(unsafe.Sizeof(seriesMeta{})) - seriesStatsSize = int(unsafe.Sizeof(seriesStats{})) + seriesStatsSize = int(unsafe.Sizeof(blockStats{})) ) type seriesMeta struct { @@ -59,7 +58,7 @@ type seriesMeta struct { _ [7]byte // padding/reserved } -type seriesStats struct { +type blockStats struct { series uint32 samples uint64 _ [4]byte // padding/reserved @@ -69,9 +68,9 @@ func (s *persistedSeries) meta() *seriesMeta { return (*seriesMeta)(unsafe.Pointer(&s.data[0])) } -func (s *persistedSeries) stats() *seriesStats { +func (s *persistedSeries) stats() *blockStats { // The stats start right behind the block meta data. - return (*seriesStats)(unsafe.Pointer(&s.data[seriesMetaSize])) + return (*blockStats)(unsafe.Pointer(&s.data[seriesMetaSize])) } // seriesAt returns the series stored at offset as a skiplist and the chunks diff --git a/db.go b/db.go index 6cd2b4a3c..0b28409d8 100644 --- a/db.go +++ b/db.go @@ -77,8 +77,8 @@ func (db *DB) Close() error { for i, shard := range db.shards { fmt.Println("shard", i) - fmt.Println(" num chunks", len(shard.head.forward)) - fmt.Println(" num samples", shard.head.samples) + fmt.Println(" num chunks", shard.head.stats().series) + fmt.Println(" num samples", shard.head.stats().samples) wg.Add(1) go func(i int, shard *SeriesShard) { @@ -185,16 +185,16 @@ func NewSeriesShard(path string) *SeriesShard { return &SeriesShard{ path: path, // TODO(fabxc): restore from checkpoint. - head: &HeadBlock{ - ivIndex: newMemIndex(), - descs: map[uint64][]*chunkDesc{}, - values: map[string]stringset{}, - forward: map[uint32]*chunkDesc{}, - }, + head: NewHeadBlock(), // TODO(fabxc): provide access to persisted blocks. } } +// blockFor returns the block of shard series that contains the given timestamp. +func (s *SeriesShard) blockFor(ts int64) block { + return nil +} + // chunkDesc wraps a plain data chunk and provides cached meta data about it. type chunkDesc struct { lset Labels diff --git a/head.go b/head.go index 5b283abb3..7b191fb62 100644 --- a/head.go +++ b/head.go @@ -2,8 +2,6 @@ package tsdb import ( "math" - "sort" - "strings" "sync" "github.com/fabxc/tsdb/chunks" @@ -11,15 +9,20 @@ import ( // HeadBlock handles reads and writes of time series data within a time window. type HeadBlock struct { - mtx sync.RWMutex - descs map[uint64][]*chunkDesc // labels hash to possible chunks descs - forward map[uint32]*chunkDesc // chunk ID to chunk desc - values map[string]stringset // label names to possible values - ivIndex *memIndex // inverted index for label pairs + mtx sync.RWMutex + descs map[uint64][]*chunkDesc // labels hash to possible chunks descs + index *memIndex samples uint64 // total samples in the block. } +func NewHeadBlock() *HeadBlock { + return &HeadBlock{ + descs: make(map[uint64][]*chunkDesc, 2048), + index: newMemIndex(), + } +} + // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. func (h *HeadBlock) get(hash uint64, lset Labels) *chunkDesc { @@ -34,39 +37,12 @@ func (h *HeadBlock) get(hash uint64, lset Labels) *chunkDesc { lset: lset, chunk: chunks.NewXORChunk(int(math.MaxInt64)), } - h.index(cd) + h.index.add(cd) h.descs[hash] = append(cds, cd) return cd } -func (h *HeadBlock) index(chkd *chunkDesc) { - // Add each label pair as a term to the inverted index. - terms := make([]string, 0, len(chkd.lset)) - b := make([]byte, 0, 64) - - for _, l := range chkd.lset { - b = append(b, l.Name...) - b = append(b, sep) - b = append(b, l.Value...) - - terms = append(terms, string(b)) - b = b[:0] - - // Add to label name to values index. - valset, ok := h.values[l.Name] - if !ok { - valset = stringset{} - h.values[l.Name] = valset - } - valset.set(l.Value) - } - id := h.ivIndex.add(terms...) - - // Store forward index for the returned ID. - h.forward[id] = chkd -} - // append adds the sample to the headblock. func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error { if err := h.get(hash, lset).append(ts, v); err != nil { @@ -76,9 +52,9 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error return nil } -func (h *HeadBlock) stats() *seriesStats { - return &seriesStats{ - series: uint32(len(h.forward)), +func (h *HeadBlock) stats() *blockStats { + return &blockStats{ + series: uint32(h.index.numSeries()), samples: h.samples, } } @@ -88,11 +64,11 @@ func (h *HeadBlock) seriesData() seriesDataIterator { defer h.mtx.RUnlock() it := &chunkDescsIterator{ - descs: make([]*chunkDesc, 0, len(h.forward)), + descs: make([]*chunkDesc, 0, len(h.index.forward)), i: -1, } - for _, cd := range h.forward { + for _, cd := range h.index.forward { it.descs = append(it.descs, cd) } return it @@ -115,27 +91,3 @@ func (it *chunkDescsIterator) values() (skiplist, []chunks.Chunk) { func (it *chunkDescsIterator) err() error { return nil } - -type stringset map[string]struct{} - -func (ss stringset) set(s string) { - ss[s] = struct{}{} -} - -func (ss stringset) has(s string) bool { - _, ok := ss[s] - return ok -} - -func (ss stringset) String() string { - return strings.Join(ss.slice(), ",") -} - -func (ss stringset) slice() []string { - slice := make([]string, 0, len(ss)) - for k := range ss { - slice = append(slice, k) - } - sort.Strings(slice) - return slice -} diff --git a/index.go b/index.go index 064f31f23..f236410a5 100644 --- a/index.go +++ b/index.go @@ -1,38 +1,86 @@ package tsdb -import "sort" +import ( + "sort" + "strings" +) // Index provides read access to an inverted index. type Index interface { Postings(ref uint32) Iterator } -// memIndex is an inverted in-memory index. type memIndex struct { lastID uint32 - m map[string][]uint32 + + forward map[uint32]*chunkDesc // chunk ID to chunk desc + values map[string]stringset // label names to possible values + postings *memPostings // postings lists for terms +} + +// newMemIndex returns a new in-memory index. +func newMemIndex() *memIndex { + return &memIndex{ + lastID: 0, + forward: make(map[uint32]*chunkDesc), + values: make(map[string]stringset), + postings: &memPostings{m: make(map[string][]uint32)}, + } +} + +func (ix *memIndex) numSeries() int { + return len(ix.forward) +} + +func (ix *memIndex) Postings(s string) Iterator { + return ix.postings.get(s) +} + +func (ix *memIndex) add(chkd *chunkDesc) { + // Add each label pair as a term to the inverted index. + terms := make([]string, 0, len(chkd.lset)) + b := make([]byte, 0, 64) + + for _, l := range chkd.lset { + b = append(b, l.Name...) + b = append(b, sep) + b = append(b, l.Value...) + + terms = append(terms, string(b)) + b = b[:0] + + // Add to label name to values index. + valset, ok := ix.values[l.Name] + if !ok { + valset = stringset{} + ix.values[l.Name] = valset + } + valset.set(l.Value) + } + ix.lastID++ + id := ix.lastID + + ix.postings.add(id, terms...) + + // Store forward index for the returned ID. + ix.forward[id] = chkd +} + +type memPostings struct { + m map[string][]uint32 } // Postings returns an iterator over the postings list for s. -func (ix *memIndex) Postings(s string) Iterator { - return &listIterator{list: ix.m[s], idx: -1} +func (p *memPostings) get(s string) Iterator { + return &listIterator{list: p.m[s], idx: -1} } // add adds a document to the index. The caller has to ensure that no // term argument appears twice. -func (ix *memIndex) add(terms ...string) uint32 { - ix.lastID++ - +func (p *memPostings) add(id uint32, terms ...string) { for _, t := range terms { - ix.m[t] = append(ix.m[t], ix.lastID) + p.m[t] = append(p.m[t], id) } - - return ix.lastID -} - -// newMemIndex returns a new in-memory index. -func newMemIndex() *memIndex { - return &memIndex{m: make(map[string][]uint32)} } // Iterator provides iterative access over a postings list. @@ -47,11 +95,6 @@ type Iterator interface { Value() uint32 } -// compressIndex returns a compressed index for the given input index. -func compressIndex(ix Index) { - -} - // Intersect returns a new iterator over the intersection of the // input iterators. func Intersect(its ...Iterator) Iterator { @@ -133,3 +176,27 @@ func (it *listIterator) Seek(x uint32) bool { }) return it.idx < len(it.list) } + +type stringset map[string]struct{} + +func (ss stringset) set(s string) { + ss[s] = struct{}{} +} + +func (ss stringset) has(s string) bool { + _, ok := ss[s] + return ok +} + +func (ss stringset) String() string { + return strings.Join(ss.slice(), ",") +} + +func (ss stringset) slice() []string { + slice := make([]string, 0, len(ss)) + for k := range ss { + slice = append(slice, k) + } + sort.Strings(slice) + return slice +}