diff --git a/block.go b/block.go index 9f3574464..69c351ccc 100644 --- a/block.go +++ b/block.go @@ -218,10 +218,11 @@ func (pb *persistedBlock) String() string { func (pb *persistedBlock) Querier(mint, maxt int64) Querier { return &blockQuerier{ - mint: mint, - maxt: maxt, - index: pb.Index(), - chunks: pb.Chunks(), + mint: mint, + maxt: maxt, + index: pb.Index(), + chunks: pb.Chunks(), + tombstones: pb.tombstones.Copy(), } } @@ -255,10 +256,8 @@ Outer: } } - // XXX(gouthamve): Adjust mint and maxt to match the time-range in the chunks? for _, chk := range chunks { - if (mint <= chk.MinTime && maxt >= chk.MinTime) || - (mint > chk.MinTime && mint <= chk.MaxTime) { + if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { vPostings = append(vPostings, p.At()) continue Outer } diff --git a/compact.go b/compact.go index d63f5cec7..7ba22f5f7 100644 --- a/compact.go +++ b/compact.go @@ -307,8 +307,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri if len(ranges) > 0 { // Re-encode the chunk to not have deleted values. for _, chk := range chks { - // Checks Overlap: http://stackoverflow.com/questions/3269434/ - if ranges[0].mint <= chk.MaxTime && chk.MinTime <= ranges[len(ranges)-1].maxt { + if intervalOverlap(ranges[0].mint, ranges[len(ranges)-1].maxt, chk.MinTime, chk.MaxTime) { newChunk := chunks.NewXORChunk() app, err := newChunk.Appender() if err != nil { diff --git a/db.go b/db.go index c4e97abcc..855a5af6e 100644 --- a/db.go +++ b/db.go @@ -669,6 +669,32 @@ func (a *dbAppender) Rollback() error { return g.Wait() } +// Delete implements deletion of metrics. +func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { + s.mtx.RLock() + + s.headmtx.RLock() + blocks := s.blocksForInterval(mint, maxt) + s.headmtx.RUnlock() + + // TODO(gouthamve): Wait for pending compactions and stop compactions until + // delete finishes. + var g errgroup.Group + + for _, b := range blocks { + f := func() error { + return b.Delete(mint, maxt, ms...) + } + g.Go(f) + } + + if err := g.Wait(); err != nil { + return err + } + + return db.reloadBlocks() +} + // appendable returns a copy of a slice of HeadBlocks that can still be appended to. func (db *DB) appendable() []headBlock { var i int @@ -681,10 +707,8 @@ func (db *DB) appendable() []headBlock { } func intervalOverlap(amin, amax, bmin, bmax int64) bool { - if bmin >= amin && bmin <= amax { - return true - } - if amin >= bmin && amin <= bmax { + // Checks Overlap: http://stackoverflow.com/questions/3269434/ + if amin <= bmax && bmin <= amax { return true } return false