From 5579efbd5bb3a478ae1d849ab3ae6024b4ab43eb Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Sun, 14 May 2017 14:36:26 +0530 Subject: [PATCH] Initial implentation of Deletes on persistedBlock Very much a WIP Signed-off-by: Goutham Veeramachaneni --- block.go | 162 ++++++++++++++++++++++++++++++++++++++++++++ chunks.go | 64 +++++++++++++++++ compact.go | 9 +++ encoding_helpers.go | 6 +- head.go | 20 +++++- index.go | 29 ++++++++ 6 files changed, 285 insertions(+), 5 deletions(-) diff --git a/block.go b/block.go index 72ebb1f8a..40126a362 100644 --- a/block.go +++ b/block.go @@ -14,14 +14,17 @@ package tsdb import ( + "bufio" "encoding/json" "fmt" "io/ioutil" "os" "path/filepath" + "sort" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/tsdb/labels" ) // DiskBlock handles reads against a Block of time series data. @@ -38,6 +41,9 @@ type DiskBlock interface { // Chunks returns a ChunkReader over the block's data. Chunks() ChunkReader + // Delete deletes data from the block. + Delete(mint, maxt int64, ms ...labels.Matcher) error + // Close releases all underlying resources of the block. Close() error } @@ -106,6 +112,7 @@ type blockMeta struct { } const metaFilename = "meta.json" +const tombstoneFilename = "tombstones" func readMetaFile(dir string) (*BlockMeta, error) { b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) @@ -207,6 +214,161 @@ func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } +func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { + pr := newPostingsReader(pb.indexr) + p, absent := pr.Select(ms...) + + ir := pb.indexr + + // Choose only valid postings which have chunks in the time-range. + vPostings := []uint32{} + +Outer: + for p.Next() { + lset, chunks, err := ir.Series(p.At()) + + for _, abs := range absent { + if lset.Get(abs) != "" { + continue Outer + } + } + + // XXX(gouthamve): Adjust mint and maxt to match the time-range in the chunks? + for _, chk := range chunks { + if (mint <= chk.MinTime && maxt >= MinTime) || + (mint > chk.MinTime && mint <= chk.MaxTime) { + vPostings = append(vPostings, p.At()) + continue + } + } + } + + if p.Err() != nil { + return p.Err() + } + + // Merge the current and new tombstones. + tr := ir.tombstones() + stones := make([]rip, 0, len(vPostings)) + i := 0 + for tr.Next() { + stone := tr.At() + for stone.ref > vPostings[i] { + stones = append(stones, rip{ref: vPostings[i], mint: mint, maxt: maxt}) + i++ + } + + if stone.ref == vPostings[i] { + if stone.mint > mint { + stone.mint = mint + } + if stone.maxt < maxt { + stone.maxt = maxt + } + + stones = append(stones, stone) + continue + } + + stones = append(stones, stone) + } + + path := filepath.Join(pb.dir, tombstoneFilename) + tmp := path + ".tmp" + + f, err := os.Create(tmp) + if err != nil { + return err + } + + // TODO: Proper format and all. + buf := encbuf{b: make([]byte, 0, 20)} + fbuf := bufio.NewWriterSize(f, 20) + + for _, stone := range stones { + buf.reset() + buf.putBE32(stone.ref) + buf.putBE64int64(stone.mint) + buf.putBE64int64(stone.maxt) + + _, err := fbuf.Write(buf.get()) + if err != nil { + return err + } + } + + if err := fbuf.Flush(); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + return renameFile(tmp, path) +} + +// rip (after rest-in-peace) holds the information on the posting and time-range +// that is deleted. +type rip struct { + ref uint32 + mint, maxt int64 +} + +// TODO(gouthamve): Move to cur and reduction in byte-array vis-a-vis BEPostings. +type tombstoneReader struct { + data []byte + idx int + len int +} + +func newTombStoneReader(data []byte) *tombstoneReader { + // TODO(gouthamve): Error handling. + return &tombstoneReader{data: data, idx: -1, len: len(data) / 20} +} + +func (t *tombstoneReader) Next() bool { + t.idx++ + + return t.idx < t.len +} + +func (t *tombstoneReader) At() rip { + bytIdx := t.idx * (4 + 8 + 8) + dat := t.data[bytIdx : bytIdx+20] + + db := &decbuf{b: dat} + ref := db.be32() + mint := db.be64int64() + maxt := db.be64int64() + + // TODO(gouthamve): Handle errors. + return rip{ref: ref, mint: mint, maxt: maxt} +} + +func (t *tombstoneReader) Seek(ref uint32) bool { + if s := t.At(); s.ref >= ref { + return true + } + + i := sort.Search(t.len-t.idx, func(i int) bool { + bytIdx := (t.idx + i) * 20 + dat := t.data[bytIdx : bytIdx+20] + + db := &decbuf{b: dat} + ref2 := db.be32() + if ref >= ref2 { + return true + } + }) + + t.idx += idx + return t.idx < t.len +} + +func (t *tombstoneReader) Err() error { + return nil +} + func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } func walDir(dir string) string { return filepath.Join(dir, "wal") } diff --git a/chunks.go b/chunks.go index 77663359c..1edb6d2ea 100644 --- a/chunks.go +++ b/chunks.go @@ -41,6 +41,18 @@ type ChunkMeta struct { Chunk chunks.Chunk MinTime, MaxTime int64 // time range the data covers + + // To handle deleted time-ranges. + deleted bool + dranges []trange +} + +type trange struct { + mint, maxt int64 +} + +func (tr trange) inBounds(t int64) bool { + return t >= tr.mint && t <= tr.maxt } // writeHash writes the chunk encoding and raw data into the provided hash. @@ -54,6 +66,58 @@ func (cm *ChunkMeta) writeHash(h hash.Hash) error { return nil } +// Iterator returns a chunks.Iterator that honors any deleted ranges. +// If there is no deleted range then the underlying iterator is returned. +func (cm *ChunkMeta) Iterator() chunks.Iterator { + if cm.Chunk == nil { + return nil + } + + if cm.deleted { + return &deletedIterator{it: cm.Chunk.Iterator(), dranges: cm.dranges} + } + + return cm.Chunk.Iterator() +} + +// deletedIterator wraps an Iterator and makes sure any deleted metrics are not +// returned. +type deletedIterator struct { + it chunks.Iterator + + dranges []trange +} + +func (it *deletedIterator) At() (int64, float64) { + return it.it.At() +} + +func (it *deletedIterator) Next() bool { +Outer: + for it.it.Next() { + ts, _ := it.it.At() + for _, tr := range it.dranges { + if tr.inBounds(ts) { + continue Outer + } + if ts > tr.maxt { + it.dranges = it.dranges[1:] + continue + } + + return true + } + + return true + } + + return false +} + +func (it *deletedIterator) Err() { + return it.Err() +} + // ChunkWriter serializes a time block of chunked series data. type ChunkWriter interface { // WriteChunks writes several chunks. The Chunk field of the ChunkMetas diff --git a/compact.go b/compact.go index 938697419..39e780b03 100644 --- a/compact.go +++ b/compact.go @@ -240,6 +240,15 @@ func (c *compactor) write(dir string, blocks ...Block) (err error) { return errors.Wrap(err, "close index writer") } + // Create an empty tombstones file. + tf, err := os.Create(filepath.Join(tmp, tombstoneFilename)) + if err != nil { + return errors.Wrap(err, "touch tombstones file") + } + if err := tf.Close(); err != nil { + return errors.Wrap(err, "close tombstones file") + } + // Block successfully written, make visible and remove old ones. if err := renameFile(tmp, dir); err != nil { return errors.Wrap(err, "rename block dir") diff --git a/encoding_helpers.go b/encoding_helpers.go index 91f73a54c..486930d22 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -22,6 +22,7 @@ func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } +func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } @@ -71,8 +72,9 @@ type decbuf struct { e error } -func (d *decbuf) uvarint() int { return int(d.uvarint64()) } -func (d *decbuf) be32int() int { return int(d.be32()) } +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } +func (d *decbuf) be64int64() int { return int64(d.be64()) } func (d *decbuf) uvarintStr() string { l := d.uvarint64() diff --git a/head.go b/head.go index b71bbafc0..1c7e90d97 100644 --- a/head.go +++ b/head.go @@ -176,6 +176,7 @@ func (h *HeadBlock) Close() error { return nil } +// Meta implements headBlock func (h *HeadBlock) Meta() BlockMeta { m := BlockMeta{ ULID: h.meta.ULID, @@ -192,11 +193,22 @@ func (h *HeadBlock) Meta() BlockMeta { return m } -func (h *HeadBlock) Dir() string { return h.dir } -func (h *HeadBlock) Persisted() bool { return false } -func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } +// Dir implements headBlock +func (h *HeadBlock) Dir() string { return h.dir } + +// Persisted implements headBlock +func (h *HeadBlock) Persisted() bool { return false } + +// Index implements headBlock +func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } + +// Chunks implements headBlock func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } +// Delete implements headBlock +func (h *HeadBlock) Delete(int64, int64, ...labels.Matcher) error { return nil } + +// Querier implements Queryable and headBlock func (h *HeadBlock) Querier(mint, maxt int64) Querier { h.mtx.RLock() defer h.mtx.RUnlock() @@ -236,6 +248,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier { } } +// Appender implements headBlock func (h *HeadBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) @@ -247,6 +260,7 @@ func (h *HeadBlock) Appender() Appender { return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()} } +// Busy implements headBlock func (h *HeadBlock) Busy() bool { return atomic.LoadUint64(&h.activeWriters) > 0 } diff --git a/index.go b/index.go index c0e96381f..051555dd2 100644 --- a/index.go +++ b/index.go @@ -537,6 +537,9 @@ type indexReader struct { // Cached hashmaps of section offsets. labels map[string]uint32 postings map[string]uint32 + + // The underlying byte slice holding the tombstone data. + tomb []byte } var ( @@ -573,6 +576,12 @@ func newIndexReader(dir string) (*indexReader, error) { return nil, errors.Wrap(err, "read postings table") } + tf, err := openMmapFile(filepath.Join(dir, tombstoneFilename)) + if err != nil { + return err + } + r.tomb = tf.b + return r, nil } @@ -741,6 +750,18 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { lbls = append(lbls, labels.Label{Name: ln, Value: lv}) } + // TODO: This sucks! Put tombstones in map. + tr := r.tombstones() + dmint, dmaxt := 0 + del := false + if tr.Seek(ref) { + s := tr.At() + if s.ref == ref { + del = true + dmint, dmaxt = s.mint, s.maxt + } + } + // Read the chunks meta data. k = int(d2.uvarint()) chunks := make([]*ChunkMeta, 0, k) @@ -754,10 +775,14 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i) } + // TODO(gouthamve): Donot add the chunk if its completely deleted. chunks = append(chunks, &ChunkMeta{ Ref: off, MinTime: mint, MaxTime: maxt, + + deleted: del, + dranges: []trange{{dmint, dmaxt}}, }) } @@ -789,6 +814,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { return newBigEndianPostings(d2.get()), nil } +func (r *indexReader) tombstones() *tombstoneReader { + return newTombStoneReader(r.tomb[:]) +} + type stringTuples struct { l int // tuple length s []string // flattened tuple entries