diff --git a/block.go b/block.go index 3e7b5ad24..2ecc7687c 100644 --- a/block.go +++ b/block.go @@ -32,12 +32,12 @@ type DiskBlock interface { // Block is an interface to a DiskBlock that can also be queried. type Block interface { DiskBlock - // Queryable + Queryable } // HeadBlock is a regular block that can still be appended to. type HeadBlock interface { - DiskBlock + Block Appendable } @@ -52,7 +52,7 @@ type Appendable interface { // Queryable defines an entity which provides a Querier. type Queryable interface { - Queryable() Querier + Querier(mint, maxt int64) Querier } // BlockMeta provides meta information about a block. @@ -86,14 +86,6 @@ const ( flagStd = 1 ) -type persistedBlock struct { - dir string - meta BlockMeta - - chunkr *chunkReader - indexr *indexReader -} - type blockMeta struct { Version int `json:"version"` @@ -141,6 +133,14 @@ func writeMetaFile(dir string, meta *BlockMeta) error { return renameFile(tmp, path) } +type persistedBlock struct { + dir string + meta BlockMeta + + chunkr *chunkReader + indexr *indexReader +} + func newPersistedBlock(dir string) (*persistedBlock, error) { meta, err := readMetaFile(dir) if err != nil { @@ -174,6 +174,15 @@ func (pb *persistedBlock) Close() error { return merr.Err() } +func (pb *persistedBlock) Querier(mint, maxt int64) Querier { + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: pb.Index(), + chunks: pb.Chunks(), + } +} + func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } diff --git a/db.go b/db.go index e84a9228a..6955477cd 100644 --- a/db.go +++ b/db.go @@ -95,7 +95,6 @@ type DB struct { // block layout. mtx sync.RWMutex blocks []Block - // seqBlocks map[int]Block // Mutex that must be held when modifying just the head blocks // or the general layout. @@ -270,9 +269,10 @@ func (db *DB) compact() (changes bool, err error) { db.headmtx.RUnlock() + db.logger.Log("msg", "picked singles", "singles", fmt.Sprintf("%v", singles)) Loop: for _, h := range singles { - db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) + db.logger.Log("msg", "write head", "seq", h.Meta().Sequence, "dir", h.Dir(), "ulid", h.Meta().ULID) select { case <-db.stopc: @@ -660,7 +660,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (db *DB) cut(mint int64) (*headBlock, error) { +func (db *DB) cut(mint int64) (HeadBlock, error) { maxt := mint + int64(db.opts.MinBlockDuration) dir, seq, err := nextSequenceFile(db.dir, "b-") diff --git a/head.go b/head.go index f801bc69c..61a131360 100644 --- a/head.go +++ b/head.go @@ -172,6 +172,22 @@ func (h *headBlock) Persisted() bool { return false } func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } +func (h *headBlock) Querier(mint, maxt int64) Querier { + h.mtx.RLock() + defer h.mtx.RUnlock() + + if h.closed { + panic(fmt.Sprintf("block %s already closed", h.dir)) + } + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: h.Index(), + chunks: h.Chunks(), + postingsMapper: h.remapPostings, + } +} + func (h *headBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) diff --git a/querier.go b/querier.go index c027a8dc0..555731171 100644 --- a/querier.go +++ b/querier.go @@ -55,27 +55,8 @@ func (s *DB) Querier(mint, maxt int64) Querier { blocks: make([]Querier, 0, len(blocks)), db: s, } - for _, b := range blocks { - q := &blockQuerier{ - mint: mint, - maxt: maxt, - index: b.Index(), - chunks: b.Chunks(), - } - - // TODO(fabxc): find nicer solution. - if hb, ok := b.(*headBlock); ok { - // TODO(fabxc): temporary refactored. - hb.mtx.RLock() - if hb.closed { - panic(fmt.Sprintf("block %s already closed", hb.dir)) - } - hb.mtx.RUnlock() - q.postingsMapper = hb.remapPostings - } - - sq.blocks = append(sq.blocks, q) + sq.blocks = append(sq.blocks, b.Querier(mint, maxt)) } return sq