diff --git a/block.go b/block.go index 814d2f87f..f18cadd60 100644 --- a/block.go +++ b/block.go @@ -10,19 +10,18 @@ import ( "github.com/pkg/errors" ) -// Block handles reads against a block of time series data. -type block interface { - dir() string - stats() BlockStats - interval() (int64, int64) - index() IndexReader - series() SeriesReader - persisted() bool +// Block handles reads against a Block of time series data. +type Block interface { + Dir() string + Stats() BlockStats + Index() IndexReader + Series() SeriesReader + Persisted() bool } +// BlockStats provides stats on a data block. type BlockStats struct { - // Time range of samples in the block. - MinTime, MaxTime int64 + MinTime, MaxTime int64 // time range of samples in the block SampleCount uint64 SeriesCount uint64 @@ -37,8 +36,8 @@ const ( ) type persistedBlock struct { - d string - bstats BlockStats + dir string + stats *BlockStats chunksf, indexf *mmapFile @@ -46,15 +45,15 @@ type persistedBlock struct { indexr *indexReader } -func newPersistedBlock(p string) (*persistedBlock, error) { +func newPersistedBlock(dir string) (*persistedBlock, error) { // TODO(fabxc): validate match of name and stats time, validate magic. // mmap files belonging to the block. - chunksf, err := openMmapFile(chunksFileName(p)) + chunksf, err := openMmapFile(chunksFileName(dir)) if err != nil { return nil, errors.Wrap(err, "open chunk file") } - indexf, err := openMmapFile(indexFileName(p)) + indexf, err := openMmapFile(indexFileName(dir)) if err != nil { return nil, errors.Wrap(err, "open index file") } @@ -74,12 +73,12 @@ func newPersistedBlock(p string) (*persistedBlock, error) { } pb := &persistedBlock{ - d: p, + dir: dir, chunksf: chunksf, indexf: indexf, chunkr: sr, indexr: ir, - bstats: stats, + stats: &stats, } return pb, nil } @@ -94,15 +93,11 @@ func (pb *persistedBlock) Close() error { return err1 } -func (pb *persistedBlock) dir() string { return pb.d } -func (pb *persistedBlock) persisted() bool { return true } -func (pb *persistedBlock) index() IndexReader { return pb.indexr } -func (pb *persistedBlock) series() SeriesReader { return pb.chunkr } -func (pb *persistedBlock) stats() BlockStats { return pb.bstats } - -func (pb *persistedBlock) interval() (int64, int64) { - return pb.bstats.MinTime, pb.bstats.MaxTime -} +func (pb *persistedBlock) Dir() string { return pb.dir } +func (pb *persistedBlock) Persisted() bool { return true } +func (pb *persistedBlock) Index() IndexReader { return pb.indexr } +func (pb *persistedBlock) Series() SeriesReader { return pb.chunkr } +func (pb *persistedBlock) Stats() BlockStats { return *pb.stats } func chunksFileName(path string) string { return filepath.Join(path, "chunks-000") diff --git a/cmd/tsdb/Makefile b/cmd/tsdb/Makefile index 91c11e39e..cbfb83b95 100644 --- a/cmd/tsdb/Makefile +++ b/cmd/tsdb/Makefile @@ -3,7 +3,7 @@ build: bench: build @echo ">> running benchmark" - @./tsdb bench write --out=benchout/ --metrics=$(NUM_METRICS) testdata.100k + @./tsdb bench write --out=$(OUT) --metrics=$(NUM_METRICS) testdata.100k @go tool pprof -svg ./tsdb benchout/cpu.prof > benchout/cpuprof.svg @go tool pprof -svg ./tsdb benchout/mem.prof > benchout/memprof.svg @go tool pprof -svg ./tsdb benchout/block.prof > benchout/blockprof.svg diff --git a/compact.go b/compact.go index a91eeedb6..bacfca2aa 100644 --- a/compact.go +++ b/compact.go @@ -49,11 +49,11 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } type blockStore interface { - blocks() []block + blocks() []Block } type compactableBlocks interface { - compactable() []block + compactable() []Block } func newCompactor(blocks compactableBlocks, r prometheus.Registerer) (*compactor, error) { @@ -70,15 +70,15 @@ const ( compactionBlocks = 2 ) -func (c *compactor) pick() []block { +func (c *compactor) pick() []Block { bs := c.blocks.compactable() if len(bs) == 0 { return nil } - if len(bs) == 1 && !bs[0].persisted() { + if len(bs) == 1 && !bs[0].Persisted() { return bs } - if !bs[0].persisted() { + if !bs[0].Persisted() { if len(bs) == 2 || !compactionMatch(bs[:3]) { return bs[:1] } @@ -93,18 +93,18 @@ func (c *compactor) pick() []block { return nil } -func compactionMatch(blocks []block) bool { +func compactionMatch(blocks []Block) bool { // TODO(fabxc): check whether combined size is below maxCompactionSize. // Apply maximum time range? or number of series? – might already be covered by size implicitly. // Naively check whether both blocks have roughly the same number of samples // and whether the total sample count doesn't exceed 2GB chunk file size // by rough approximation. - n := float64(blocks[0].stats().SampleCount) + n := float64(blocks[0].Stats().SampleCount) t := n for _, b := range blocks[1:] { - m := float64(b.stats().SampleCount) + m := float64(b.Stats().SampleCount) if m < 0.8*n || m > 1.2*n { return false @@ -116,17 +116,17 @@ func compactionMatch(blocks []block) bool { return t < 10*200e6 } -func mergeStats(blocks ...block) (res BlockStats) { - res.MinTime = blocks[0].stats().MinTime - res.MaxTime = blocks[len(blocks)-1].stats().MaxTime +func mergeStats(blocks ...Block) (res BlockStats) { + res.MinTime = blocks[0].Stats().MinTime + res.MaxTime = blocks[len(blocks)-1].Stats().MaxTime for _, b := range blocks { - res.SampleCount += b.stats().SampleCount + res.SampleCount += b.Stats().SampleCount } return res } -func (c *compactor) compact(dir string, blocks ...block) (err error) { +func (c *compactor) compact(dir string, blocks ...Block) (err error) { start := time.Now() defer func() { if err != nil { @@ -182,18 +182,18 @@ func (c *compactor) compact(dir string, blocks ...block) (err error) { return nil } -func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWriter) error { +func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWriter) error { var set compactionSet for i, b := range blocks { - all, err := b.index().Postings("", "") + all, err := b.Index().Postings("", "") if err != nil { return err } // TODO(fabxc): find more transparent way of handling this. - if hb, ok := b.(*HeadBlock); ok { + if hb, ok := b.(*headBlock); ok { all = hb.remapPostings(all) } - s := newCompactionSeriesSet(b.index(), b.series(), all) + s := newCompactionSeriesSet(b.Index(), b.Series(), all) if i == 0 { set = s diff --git a/db.go b/db.go index 2add43a5d..76e82e3ba 100644 --- a/db.go +++ b/db.go @@ -73,7 +73,7 @@ type DB struct { mtx sync.RWMutex persisted []*persistedBlock - heads []*HeadBlock + heads []*headBlock compactor *compactor compactc chan struct{} @@ -174,9 +174,9 @@ func (db *DB) run() { } // TODO(fabxc): pick emits blocks in order. compact acts on // inverted order. Put inversion into compactor? - var bs []block + var bs []Block for _, b := range blocks { - bs = append([]block{b}, bs...) + bs = append([]Block{b}, bs...) } select { @@ -195,15 +195,15 @@ func (db *DB) run() { } } -func (db *DB) compact(blocks []block) error { +func (db *DB) compact(blocks []Block) error { if len(blocks) == 0 { return nil } - tmpdir := blocks[0].dir() + ".tmp" + tmpdir := blocks[0].Dir() + ".tmp" // TODO(fabxc): find a better place to do this transparently. for _, b := range blocks { - if h, ok := b.(*HeadBlock); ok { + if h, ok := b.(*headBlock); ok { h.updateMapping() } } @@ -215,11 +215,11 @@ func (db *DB) compact(blocks []block) error { db.mtx.Lock() defer db.mtx.Unlock() - if err := renameDir(tmpdir, blocks[0].dir()); err != nil { + if err := renameDir(tmpdir, blocks[0].Dir()); err != nil { return errors.Wrap(err, "rename dir") } for _, b := range blocks[1:] { - if err := os.RemoveAll(b.dir()); err != nil { + if err := os.RemoveAll(b.Dir()); err != nil { return errors.Wrap(err, "delete dir") } } @@ -227,7 +227,7 @@ func (db *DB) compact(blocks []block) error { var merr MultiError for _, b := range blocks { - merr.Add(errors.Wrapf(db.reinit(b.dir()), "reinit block at %q", b.dir())) + merr.Add(errors.Wrapf(db.reinit(b.Dir()), "reinit block at %q", b.Dir())) } return merr.Err() } @@ -248,7 +248,7 @@ func isBlockDir(fi os.FileInfo) bool { func (db *DB) initBlocks() error { var ( pbs []*persistedBlock - heads []*HeadBlock + heads []*headBlock ) files, err := ioutil.ReadDir(db.dir) @@ -263,7 +263,7 @@ func (db *DB) initBlocks() error { dir := filepath.Join(db.dir, fi.Name()) if fileutil.Exist(filepath.Join(dir, walFileName)) { - h, err := OpenHeadBlock(dir, db.logger) + h, err := openHeadBlock(dir, db.logger) if err != nil { return err } @@ -282,16 +282,16 @@ func (db *DB) initBlocks() error { lastTime := int64(math.MinInt64) for _, b := range pbs { - if b.stats().MinTime < lastTime { - return errors.Errorf("illegal order for block at %q", b.dir()) + if b.Stats().MinTime < lastTime { + return errors.Errorf("illegal order for block at %q", b.Dir()) } - lastTime = b.stats().MaxTime + lastTime = b.Stats().MaxTime } for _, b := range heads { - if b.stats().MinTime < lastTime { - return errors.Errorf("illegal order for block at %q", b.dir()) + if b.Stats().MinTime < lastTime { + return errors.Errorf("illegal order for block at %q", b.Dir()) } - lastTime = b.stats().MaxTime + lastTime = b.Stats().MaxTime } db.persisted = pbs @@ -381,7 +381,7 @@ func (db *DB) appendBatch(samples []hashedSample) error { func (db *DB) headForDir(dir string) (int, bool) { for i, b := range db.heads { - if b.dir() == dir { + if b.Dir() == dir { return i, true } } @@ -390,7 +390,7 @@ func (db *DB) headForDir(dir string) (int, bool) { func (db *DB) persistedForDir(dir string) (int, bool) { for i, b := range db.persisted { - if b.dir() == dir { + if b.Dir() == dir { return i, true } } @@ -441,13 +441,13 @@ func (db *DB) reinit(dir string) error { return nil } -func (db *DB) compactable() []block { +func (db *DB) compactable() []Block { db.mtx.RLock() defer db.mtx.RUnlock() - var blocks []block + var blocks []Block for _, pb := range db.persisted { - blocks = append([]block{pb}, blocks...) + blocks = append([]Block{pb}, blocks...) } // threshold := db.heads[len(db.heads)-1].bstatdb.MaxTime - headGracePeriod @@ -458,7 +458,7 @@ func (db *DB) compactable() []block { // } // } for _, hb := range db.heads[:len(db.heads)-1] { - blocks = append([]block{hb}, blocks...) + blocks = append([]Block{hb}, blocks...) } return blocks @@ -480,20 +480,18 @@ func intervalContains(min, max, t int64) bool { // blocksForInterval returns all blocks within the partition that may contain // data for the given time range. -func (db *DB) blocksForInterval(mint, maxt int64) []block { - var bs []block +func (db *DB) blocksForInterval(mint, maxt int64) []Block { + var bs []Block for _, b := range db.persisted { - bmin, bmax := b.interval() - - if intervalOverlap(mint, maxt, bmin, bmax) { + s := b.Stats() + if intervalOverlap(mint, maxt, s.MinTime, s.MaxTime) { bs = append(bs, b) } } for _, b := range db.heads { - bmin, bmax := b.interval() - - if intervalOverlap(mint, maxt, bmin, bmax) { + s := b.Stats() + if intervalOverlap(mint, maxt, s.MinTime, s.MaxTime) { bs = append(bs, b) } } @@ -511,7 +509,7 @@ func (db *DB) cut() error { if err != nil { return err } - newHead, err := OpenHeadBlock(dir, db.logger) + newHead, err := openHeadBlock(dir, db.logger) if err != nil { return err } diff --git a/head.go b/head.go index 2118e367e..264af2211 100644 --- a/head.go +++ b/head.go @@ -13,10 +13,10 @@ import ( "github.com/go-kit/kit/log" ) -// HeadBlock handles reads and writes of time series data within a time window. -type HeadBlock struct { +// headBlock handles reads and writes of time series data within a time window. +type headBlock struct { mtx sync.RWMutex - d string + dir string // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. @@ -33,18 +33,18 @@ type HeadBlock struct { wal *WAL - bstats *BlockStats + stats *BlockStats } -// OpenHeadBlock creates a new empty head block. -func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { +// openHeadBlock creates a new empty head block. +func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 15*time.Second) if err != nil { return nil, err } - b := &HeadBlock{ - d: dir, + b := &headBlock{ + dir: dir, descs: []*chunkDesc{}, hashes: map[uint64][]*chunkDesc{}, values: map[string]stringset{}, @@ -52,31 +52,25 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { wal: wal, mapper: newPositionMapper(nil), } - b.bstats = &BlockStats{ - MinTime: math.MaxInt64, - MaxTime: math.MinInt64, + b.stats = &BlockStats{ + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, } err = wal.ReadAll(&walHandler{ series: func(lset labels.Labels) { b.create(lset.Hash(), lset) + b.stats.SeriesCount++ + b.stats.ChunkCount++ // head block has one chunk/series }, sample: func(s hashedSample) { cd := b.descs[s.ref] - - // Duplicated from appendBatch – TODO(fabxc): deduplicate? - if cd.lastTimestamp == s.t && cd.lastValue != s.v { - return - } cd.append(s.t, s.v) - if s.t > b.bstats.MaxTime { - b.bstats.MaxTime = s.t + if s.t > b.stats.MaxTime { + b.stats.MaxTime = s.t } - if s.t < b.bstats.MinTime { - b.bstats.MinTime = s.t - } - b.bstats.SampleCount++ + b.stats.SampleCount++ }, }) if err != nil { @@ -89,24 +83,29 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { } // Close syncs all data and closes underlying resources of the head block. -func (h *HeadBlock) Close() error { +func (h *headBlock) Close() error { return h.wal.Close() } -func (h *HeadBlock) dir() string { return h.d } -func (h *HeadBlock) persisted() bool { return false } -func (h *HeadBlock) index() IndexReader { return h } -func (h *HeadBlock) series() SeriesReader { return h } +func (h *headBlock) Dir() string { return h.dir } +func (h *headBlock) Persisted() bool { return false } +func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } +func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} } -func (h *HeadBlock) stats() BlockStats { - h.bstats.mtx.RLock() - defer h.bstats.mtx.RUnlock() +// Stats returns statisitics about the indexed data. +func (h *headBlock) Stats() BlockStats { + h.stats.mtx.RLock() + defer h.stats.mtx.RUnlock() - return *h.bstats + return *h.stats +} + +type headSeriesReader struct { + *headBlock } // Chunk returns the chunk for the reference number. -func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { +func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) { h.mtx.RLock() defer h.mtx.RUnlock() @@ -116,22 +115,6 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { return h.descs[int(ref)].chunk, nil } -type headSeriesReader struct { - h *HeadBlock -} - -func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) { - h.h.mtx.RLock() - defer h.h.mtx.RUnlock() - - if int(ref) >= len(h.h.descs) { - return nil, errNotFound - } - return &safeChunk{ - cd: h.h.descs[int(ref)], - }, nil -} - type safeChunk struct { cd *chunkDesc } @@ -147,21 +130,12 @@ func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } func (c *safeChunk) Bytes() []byte { panic("illegal") } func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } -func (h *HeadBlock) interval() (int64, int64) { - h.bstats.mtx.RLock() - defer h.bstats.mtx.RUnlock() - return h.bstats.MinTime, h.bstats.MaxTime -} - -// Stats returns statisitics about the indexed data. -func (h *HeadBlock) Stats() (BlockStats, error) { - h.bstats.mtx.RLock() - defer h.bstats.mtx.RUnlock() - return *h.bstats, nil +type headIndexReader struct { + *headBlock } // LabelValues returns the possible label values -func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) { +func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) { h.mtx.RLock() defer h.mtx.RUnlock() @@ -179,7 +153,7 @@ func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) { } // Postings returns the postings list iterator for the label pair. -func (h *HeadBlock) Postings(name, value string) (Postings, error) { +func (h *headIndexReader) Postings(name, value string) (Postings, error) { h.mtx.RLock() defer h.mtx.RUnlock() @@ -187,7 +161,7 @@ func (h *HeadBlock) Postings(name, value string) (Postings, error) { } // Series returns the series for the given reference. -func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { +func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { h.mtx.RLock() defer h.mtx.RUnlock() @@ -206,7 +180,7 @@ func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { return cd.lset, []ChunkMeta{meta}, nil } -func (h *HeadBlock) LabelIndices() ([][]string, error) { +func (h *headIndexReader) LabelIndices() ([][]string, error) { h.mtx.RLock() defer h.mtx.RUnlock() @@ -218,9 +192,15 @@ func (h *HeadBlock) LabelIndices() ([][]string, error) { return res, nil } +func (h *headIndexReader) Stats() (BlockStats, error) { + h.stats.mtx.RLock() + defer h.stats.mtx.RUnlock() + return *h.stats, nil +} + // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. -func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc { +func (h *headBlock) get(hash uint64, lset labels.Labels) *chunkDesc { cds := h.hashes[hash] for _, cd := range cds { @@ -231,7 +211,7 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc { return nil } -func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { +func (h *headBlock) create(hash uint64, lset labels.Labels) *chunkDesc { cd := &chunkDesc{ lset: lset, chunk: chunks.NewXORChunk(), @@ -274,9 +254,11 @@ var ( // ErrAmendSample is returned if an appended sample has the same timestamp // as the most recent sample but a different value. ErrAmendSample = errors.New("amending sample") + + ErrOutOfBounds = errors.New("out of bounds") ) -func (h *HeadBlock) appendBatch(samples []hashedSample) (int, error) { +func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { // Find head chunks for all samples and allocate new IDs/refs for // ones we haven't seen before. var ( @@ -370,31 +352,31 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) (int, error) { } } - h.bstats.mtx.Lock() - defer h.bstats.mtx.Unlock() + h.stats.mtx.Lock() + defer h.stats.mtx.Unlock() - h.bstats.SampleCount += total - h.bstats.SeriesCount += uint64(len(newSeries)) - h.bstats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series + h.stats.SampleCount += total + h.stats.SeriesCount += uint64(len(newSeries)) + h.stats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series - if mint < h.bstats.MinTime { - h.bstats.MinTime = mint + if mint < h.stats.MinTime { + h.stats.MinTime = mint } - if maxt > h.bstats.MaxTime { - h.bstats.MaxTime = maxt + if maxt > h.stats.MaxTime { + h.stats.MaxTime = maxt } return int(total), nil } -func (h *HeadBlock) fullness() float64 { - h.bstats.mtx.RLock() - defer h.bstats.mtx.RUnlock() +func (h *headBlock) fullness() float64 { + h.stats.mtx.RLock() + defer h.stats.mtx.RUnlock() - return float64(h.bstats.SampleCount) / float64(h.bstats.SeriesCount+1) / 250 + return float64(h.stats.SampleCount) / float64(h.stats.SeriesCount+1) / 250 } -func (h *HeadBlock) updateMapping() { +func (h *headBlock) updateMapping() { h.mtx.RLock() if h.mapper.sortable != nil && h.mapper.Len() == len(h.descs) { @@ -418,7 +400,7 @@ func (h *HeadBlock) updateMapping() { // of the series they reference. // Returned postings have no longer monotonic IDs and MUST NOT be used for regular // postings set operations, i.e. intersect and merge. -func (h *HeadBlock) remapPostings(p Postings) Postings { +func (h *headBlock) remapPostings(p Postings) Postings { list, err := expandPostings(p) if err != nil { return errPostings{err: err} diff --git a/querier.go b/querier.go index bbbc7abf3..df17ad270 100644 --- a/querier.go +++ b/querier.go @@ -58,14 +58,13 @@ func (s *DB) Querier(mint, maxt int64) Querier { q := &blockQuerier{ mint: mint, maxt: maxt, - index: b.index(), - series: b.series(), + index: b.Index(), + series: b.Series(), } // TODO(fabxc): find nicer solution. - if hb, ok := b.(*HeadBlock); ok { + if hb, ok := b.(*headBlock); ok { q.postingsMapper = hb.remapPostings - q.series = &headSeriesReader{h: hb} } sq.blocks = append(sq.blocks, q)