diff --git a/block.go b/block.go index 94f57f206..7843f232d 100644 --- a/block.go +++ b/block.go @@ -41,6 +41,9 @@ type DiskBlock interface { // Chunks returns a ChunkReader over the block's data. Chunks() ChunkReader + // Tombstones returns a TombstoneReader over the block's deleted data. + Tombstones() TombstoneReader + // Delete deletes data from the block. Delete(mint, maxt int64, ms ...labels.Matcher) error @@ -241,6 +244,10 @@ type persistedBlock struct { chunkr *chunkReader indexr *indexReader + + // For tombstones. + stones []uint32 + tombstones map[uint32][]trange } func newPersistedBlock(dir string) (*persistedBlock, error) { @@ -258,11 +265,23 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, err } + tr, err := readTombstoneFile(dir) + if err != nil { + return nil, err + } + ts := make(map[uint32][]trange) + for tr.Next() { + s := tr.At() + ts[s.ref] = s.ranges + } + pb := &persistedBlock{ dir: dir, meta: *meta, chunkr: cr, indexr: ir, + + tombstones: ts, } return pb, nil } @@ -292,7 +311,10 @@ func (pb *persistedBlock) Querier(mint, maxt int64) Querier { func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } -func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } +func (pb *persistedBlock) Tombstones() TombstoneReader { + return newMapTombstoneReader(pb.tombstones) +} +func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { pr := newPostingsReader(pb.indexr) @@ -348,6 +370,7 @@ type stone struct { // TombstoneReader is the iterator over tombstones. type TombstoneReader interface { Next() bool + Seek(ref uint32) bool At() stone Err() error } @@ -402,6 +425,16 @@ func (t *tombstoneReader) Next() bool { return t.idx < t.len } +func (t *tombstoneReader) Seek(ref uint32) bool { + bytIdx := t.idx * 12 + + t.idx += sort.Search(t.len-t.idx, func(i int) bool { + return binary.BigEndian.Uint32(t.b[bytIdx+i*12:]) >= ref + }) + + return t.idx < t.len +} + func (t *tombstoneReader) At() stone { bytIdx := t.idx * (4 + 8) dat := t.stones[bytIdx : bytIdx+12] @@ -443,6 +476,7 @@ type mapTombstoneReader struct { stones map[uint32][]trange } +// TODO(gouthamve): Take pre-sorted refs. func newMapTombstoneReader(ts map[uint32][]trange) *mapTombstoneReader { refs := make([]uint32, 0, len(ts)) for k := range ts { @@ -463,6 +497,25 @@ func (t *mapTombstoneReader) Next() bool { return false } +func (t *mapTombstoneReader) Seek(ref uint32) bool { + // If the current value satisfies, then return. + if t.cur >= ref { + return true + } + + // Do binary search between current position and end. + i := sort.Search(len(t.refs), func(i int) bool { + return t.refs[i] >= ref + }) + if i < len(t.refs) { + t.cur = t.refs[i] + t.refs = t.refs[i+1:] + return true + } + t.refs = nil + return false +} + func (t *mapTombstoneReader) At() stone { return stone{ref: t.cur, ranges: t.stones[t.cur]} } @@ -492,6 +545,25 @@ func (t *simpleTombstoneReader) Next() bool { return false } +func (t *simpleTombstoneReader) Seek(ref uint32) bool { + // If the current value satisfies, then return. + if t.cur >= ref { + return true + } + + // Do binary search between current position and end. + i := sort.Search(len(t.refs), func(i int) bool { + return t.refs[i] >= ref + }) + if i < len(t.refs) { + t.cur = t.refs[i] + t.refs = t.refs[i+1:] + return true + } + t.refs = nil + return false +} + func (t *simpleTombstoneReader) At() stone { return stone{ref: t.cur, ranges: t.ranges} } @@ -554,6 +626,17 @@ func (t *mergedTombstoneReader) Next() bool { return true } +func (t *mergedTombstoneReader) Seek(ref uint32) bool { + if t.cur.ref >= ref { + return true + } + + t.aok = t.a.Seek(ref) + t.bok = t.b.Seek(ref) + t.initialized = true + + return t.Next() +} func (t *mergedTombstoneReader) At() stone { return t.cur } diff --git a/head.go b/head.go index 4b59a9d74..559d1100c 100644 --- a/head.go +++ b/head.go @@ -210,19 +210,24 @@ func (h *HeadBlock) Meta() BlockMeta { return m } -// Dir implements headBlock +// Dir implements headBlock. func (h *HeadBlock) Dir() string { return h.dir } -// Persisted implements headBlock +// Persisted implements headBlock. func (h *HeadBlock) Persisted() bool { return false } -// Index implements headBlock +// Index implements headBlock. func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } -// Chunks implements headBlock +// Chunks implements headBlock. func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } -// Delete implements headBlock +// Tombstones implements headBlock. +func (h *HeadBlock) Tombstones() TombstoneReader { + return newMapTombstoneReader(h.tombstones) +} + +// Delete implements headBlock. func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error { h.mtx.RLock() @@ -246,13 +251,7 @@ Outer: } } - rs, ok := h.tombstones[ref] - if !ok { - h.tombstones[ref] = []trange{{mint, maxt}} - continue - } - - h.tombstones[ref] = addNewInterval(rs, trange{mint, maxt}) + h.tombstones[ref] = addNewInterval(h.tombstones[ref], trange{mint, maxt}) } if p.Err() != nil { @@ -262,7 +261,7 @@ Outer: return writeTombstoneFile(h.dir, newMapTombstoneReader(h.tombstones)) } -// Querier implements Queryable and headBlock +// Querier implements Queryable and headBlock. func (h *HeadBlock) Querier(mint, maxt int64) Querier { h.mtx.RLock() defer h.mtx.RUnlock() diff --git a/querier.go b/querier.go index 86dd76b99..a7b67ea2f 100644 --- a/querier.go +++ b/querier.go @@ -126,8 +126,9 @@ func (q *querier) Close() error { // blockQuerier provides querying access to a single block database. type blockQuerier struct { - index IndexReader - chunks ChunkReader + index IndexReader + chunks ChunkReader + tombstones TombstoneReader postingsMapper func(Postings) Postings