Add Queryable interface to Block

This adds the Queryable interface to the Block interface. Head and
persisted blocks now implement their own Querier() method and thus
isolate customization (e.g. remapPostings) more cleanly.
This commit is contained in:
Fabian Reinartz 2017-03-20 10:21:21 +01:00
parent 11be2cc585
commit 2c999836fb
4 changed files with 40 additions and 34 deletions

View File

@ -32,12 +32,12 @@ type DiskBlock interface {
// Block is an interface to a DiskBlock that can also be queried. // Block is an interface to a DiskBlock that can also be queried.
type Block interface { type Block interface {
DiskBlock DiskBlock
// Queryable Queryable
} }
// HeadBlock is a regular block that can still be appended to. // HeadBlock is a regular block that can still be appended to.
type HeadBlock interface { type HeadBlock interface {
DiskBlock Block
Appendable Appendable
} }
@ -52,7 +52,7 @@ type Appendable interface {
// Queryable defines an entity which provides a Querier. // Queryable defines an entity which provides a Querier.
type Queryable interface { type Queryable interface {
Queryable() Querier Querier(mint, maxt int64) Querier
} }
// BlockMeta provides meta information about a block. // BlockMeta provides meta information about a block.
@ -86,14 +86,6 @@ const (
flagStd = 1 flagStd = 1
) )
type persistedBlock struct {
dir string
meta BlockMeta
chunkr *chunkReader
indexr *indexReader
}
type blockMeta struct { type blockMeta struct {
Version int `json:"version"` Version int `json:"version"`
@ -141,6 +133,14 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
return renameFile(tmp, path) return renameFile(tmp, path)
} }
type persistedBlock struct {
dir string
meta BlockMeta
chunkr *chunkReader
indexr *indexReader
}
func newPersistedBlock(dir string) (*persistedBlock, error) { func newPersistedBlock(dir string) (*persistedBlock, error) {
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
@ -174,6 +174,15 @@ func (pb *persistedBlock) Close() error {
return merr.Err() return merr.Err()
} }
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
return &blockQuerier{
mint: mint,
maxt: maxt,
index: pb.Index(),
chunks: pb.Chunks(),
}
}
func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }

6
db.go
View File

@ -95,7 +95,6 @@ type DB struct {
// block layout. // block layout.
mtx sync.RWMutex mtx sync.RWMutex
blocks []Block blocks []Block
// seqBlocks map[int]Block
// Mutex that must be held when modifying just the head blocks // Mutex that must be held when modifying just the head blocks
// or the general layout. // or the general layout.
@ -270,9 +269,10 @@ func (db *DB) compact() (changes bool, err error) {
db.headmtx.RUnlock() db.headmtx.RUnlock()
db.logger.Log("msg", "picked singles", "singles", fmt.Sprintf("%v", singles))
Loop: Loop:
for _, h := range singles { for _, h := range singles {
db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) db.logger.Log("msg", "write head", "seq", h.Meta().Sequence, "dir", h.Dir(), "ulid", h.Meta().ULID)
select { select {
case <-db.stopc: case <-db.stopc:
@ -660,7 +660,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
// cut starts a new head block to append to. The completed head block // cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period. // will still be appendable for the configured grace period.
func (db *DB) cut(mint int64) (*headBlock, error) { func (db *DB) cut(mint int64) (HeadBlock, error) {
maxt := mint + int64(db.opts.MinBlockDuration) maxt := mint + int64(db.opts.MinBlockDuration)
dir, seq, err := nextSequenceFile(db.dir, "b-") dir, seq, err := nextSequenceFile(db.dir, "b-")

16
head.go
View File

@ -172,6 +172,22 @@ func (h *headBlock) Persisted() bool { return false }
func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } func (h *headBlock) Index() IndexReader { return &headIndexReader{h} }
func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} }
func (h *headBlock) Querier(mint, maxt int64) Querier {
h.mtx.RLock()
defer h.mtx.RUnlock()
if h.closed {
panic(fmt.Sprintf("block %s already closed", h.dir))
}
return &blockQuerier{
mint: mint,
maxt: maxt,
index: h.Index(),
chunks: h.Chunks(),
postingsMapper: h.remapPostings,
}
}
func (h *headBlock) Appender() Appender { func (h *headBlock) Appender() Appender {
atomic.AddUint64(&h.activeWriters, 1) atomic.AddUint64(&h.activeWriters, 1)

View File

@ -55,27 +55,8 @@ func (s *DB) Querier(mint, maxt int64) Querier {
blocks: make([]Querier, 0, len(blocks)), blocks: make([]Querier, 0, len(blocks)),
db: s, db: s,
} }
for _, b := range blocks { for _, b := range blocks {
q := &blockQuerier{ sq.blocks = append(sq.blocks, b.Querier(mint, maxt))
mint: mint,
maxt: maxt,
index: b.Index(),
chunks: b.Chunks(),
}
// TODO(fabxc): find nicer solution.
if hb, ok := b.(*headBlock); ok {
// TODO(fabxc): temporary refactored.
hb.mtx.RLock()
if hb.closed {
panic(fmt.Sprintf("block %s already closed", hb.dir))
}
hb.mtx.RUnlock()
q.postingsMapper = hb.remapPostings
}
sq.blocks = append(sq.blocks, q)
} }
return sq return sq