From 2c999836fbec923495ad2818d1f94df8a0c22876 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 20 Mar 2017 10:21:21 +0100 Subject: [PATCH] Add Queryable interface to Block This adds the Queryable interface to the Block interface. Head and persisted blocks now implement their own Querier() method and thus isolate customization (e.g. remapPostings) more cleanly. --- block.go | 31 ++++++++++++++++++++----------- db.go | 6 +++--- head.go | 16 ++++++++++++++++ querier.go | 21 +-------------------- 4 files changed, 40 insertions(+), 34 deletions(-) diff --git a/block.go b/block.go index 3e7b5ad24..2ecc7687c 100644 --- a/block.go +++ b/block.go @@ -32,12 +32,12 @@ type DiskBlock interface { // Block is an interface to a DiskBlock that can also be queried. type Block interface { DiskBlock - // Queryable + Queryable } // HeadBlock is a regular block that can still be appended to. type HeadBlock interface { - DiskBlock + Block Appendable } @@ -52,7 +52,7 @@ type Appendable interface { // Queryable defines an entity which provides a Querier. type Queryable interface { - Queryable() Querier + Querier(mint, maxt int64) Querier } // BlockMeta provides meta information about a block. @@ -86,14 +86,6 @@ const ( flagStd = 1 ) -type persistedBlock struct { - dir string - meta BlockMeta - - chunkr *chunkReader - indexr *indexReader -} - type blockMeta struct { Version int `json:"version"` @@ -141,6 +133,14 @@ func writeMetaFile(dir string, meta *BlockMeta) error { return renameFile(tmp, path) } +type persistedBlock struct { + dir string + meta BlockMeta + + chunkr *chunkReader + indexr *indexReader +} + func newPersistedBlock(dir string) (*persistedBlock, error) { meta, err := readMetaFile(dir) if err != nil { @@ -174,6 +174,15 @@ func (pb *persistedBlock) Close() error { return merr.Err() } +func (pb *persistedBlock) Querier(mint, maxt int64) Querier { + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: pb.Index(), + chunks: pb.Chunks(), + } +} + func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } diff --git a/db.go b/db.go index e84a9228a..6955477cd 100644 --- a/db.go +++ b/db.go @@ -95,7 +95,6 @@ type DB struct { // block layout. mtx sync.RWMutex blocks []Block - // seqBlocks map[int]Block // Mutex that must be held when modifying just the head blocks // or the general layout. @@ -270,9 +269,10 @@ func (db *DB) compact() (changes bool, err error) { db.headmtx.RUnlock() + db.logger.Log("msg", "picked singles", "singles", fmt.Sprintf("%v", singles)) Loop: for _, h := range singles { - db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) + db.logger.Log("msg", "write head", "seq", h.Meta().Sequence, "dir", h.Dir(), "ulid", h.Meta().ULID) select { case <-db.stopc: @@ -660,7 +660,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (db *DB) cut(mint int64) (*headBlock, error) { +func (db *DB) cut(mint int64) (HeadBlock, error) { maxt := mint + int64(db.opts.MinBlockDuration) dir, seq, err := nextSequenceFile(db.dir, "b-") diff --git a/head.go b/head.go index f801bc69c..61a131360 100644 --- a/head.go +++ b/head.go @@ -172,6 +172,22 @@ func (h *headBlock) Persisted() bool { return false } func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } +func (h *headBlock) Querier(mint, maxt int64) Querier { + h.mtx.RLock() + defer h.mtx.RUnlock() + + if h.closed { + panic(fmt.Sprintf("block %s already closed", h.dir)) + } + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: h.Index(), + chunks: h.Chunks(), + postingsMapper: h.remapPostings, + } +} + func (h *headBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) diff --git a/querier.go b/querier.go index c027a8dc0..555731171 100644 --- a/querier.go +++ b/querier.go @@ -55,27 +55,8 @@ func (s *DB) Querier(mint, maxt int64) Querier { blocks: make([]Querier, 0, len(blocks)), db: s, } - for _, b := range blocks { - q := &blockQuerier{ - mint: mint, - maxt: maxt, - index: b.Index(), - chunks: b.Chunks(), - } - - // TODO(fabxc): find nicer solution. - if hb, ok := b.(*headBlock); ok { - // TODO(fabxc): temporary refactored. - hb.mtx.RLock() - if hb.closed { - panic(fmt.Sprintf("block %s already closed", hb.dir)) - } - hb.mtx.RUnlock() - q.postingsMapper = hb.remapPostings - } - - sq.blocks = append(sq.blocks, q) + sq.blocks = append(sq.blocks, b.Querier(mint, maxt)) } return sq