From f29fb62fbaf61c8c9430e8f0a4a04fa508f89b15 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 24 May 2017 11:24:24 +0530 Subject: [PATCH] Make TombstoneReader a Getter. Signed-off-by: Goutham Veeramachaneni --- block.go | 27 ++-- compact.go | 10 +- head.go | 16 +-- querier.go | 6 +- querier_test.go | 6 +- tombstones.go | 343 +++++---------------------------------------- tombstones_test.go | 231 +----------------------------- wal.go | 15 +- wal_test.go | 10 +- 9 files changed, 79 insertions(+), 585 deletions(-) diff --git a/block.go b/block.go index 45a63ae5e..82dfb6594 100644 --- a/block.go +++ b/block.go @@ -162,7 +162,7 @@ type persistedBlock struct { indexr *indexReader // For tombstones. - tombstones *mapTombstoneReader + tombstones tombstoneReader } func newPersistedBlock(dir string) (*persistedBlock, error) { @@ -180,25 +180,18 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, err } - tr, err := readTombstoneFile(dir) + tr, err := readTombstones(dir) if err != nil { return nil, err } - ts := make(map[uint32]intervals) - for tr.Next() { - s := tr.At() - ts[s.ref] = s.intervals - } - pb := &persistedBlock{ dir: dir, meta: *meta, chunkr: cr, indexr: ir, - // TODO(gouthamve): We will be sorting the refs again internally, is it a big deal? - tombstones: newMapTombstoneReader(ts), + tombstones: tr, } return pb, nil } @@ -230,7 +223,7 @@ func (pb *persistedBlock) Dir() string { return pb.dir } func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } func (pb *persistedBlock) Tombstones() TombstoneReader { - return pb.tombstones.Copy() + return pb.tombstones } func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } @@ -277,16 +270,18 @@ Outer: } // Merge the current and new tombstones. - tr := pb.Tombstones() - str := newMapTombstoneReader(delStones) - tombreader := newMergedTombstoneReader(tr, str) + for k, v := range pb.tombstones { + for _, itv := range v { + delStones[k] = delStones[k].add(itv) + } + } + tombreader := newTombstoneReader(delStones) if err := writeTombstoneFile(pb.dir, tombreader); err != nil { return err } - // TODO(gouthamve): This counts any common tombstones too. But gives the same heuristic. - pb.meta.NumTombstones += int64(len(delStones)) + pb.meta.NumTombstones = int64(len(delStones)) return writeMetaFile(pb.dir, &pb.meta) } diff --git a/compact.go b/compact.go index 948e2f824..d139be026 100644 --- a/compact.go +++ b/compact.go @@ -432,14 +432,8 @@ func (c *compactionSeriesSet) Next() bool { return false } - if c.tombstones.Seek(c.p.At()) { - s := c.tombstones.At() - if c.p.At() == s.ref { - c.intervals = s.intervals - } else { - c.intervals = nil - } - } + c.intervals = c.tombstones.At(c.p.At()) + c.l, c.c, c.err = c.index.Series(c.p.At()) if c.err != nil { return false diff --git a/head.go b/head.go index 72aa67a6d..704c0b222 100644 --- a/head.go +++ b/head.go @@ -69,7 +69,7 @@ type HeadBlock struct { values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms - tombstones *mapTombstoneReader + tombstones tombstoneReader meta BlockMeta } @@ -153,7 +153,7 @@ func (h *HeadBlock) init() error { deletesFunc := func(stones []stone) error { for _, s := range stones { for _, itv := range s.intervals { - h.tombstones.stones[s.ref] = h.tombstones.stones[s.ref].add(itv) + h.tombstones[s.ref] = h.tombstones[s.ref].add(itv) } } @@ -163,7 +163,6 @@ func (h *HeadBlock) init() error { if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { return errors.Wrap(err, "consume WAL") } - h.tombstones = newMapTombstoneReader(h.tombstones.stones) return nil } @@ -221,7 +220,7 @@ func (h *HeadBlock) Meta() BlockMeta { // Tombstones returns the TombstoneReader against the block. func (h *HeadBlock) Tombstones() TombstoneReader { - return h.tombstones.Copy() + return h.tombstones } // Delete implements headBlock. @@ -257,16 +256,15 @@ Outer: if p.Err() != nil { return p.Err() } - if err := h.wal.LogDeletes(newMapTombstoneReader(newStones)); err != nil { + if err := h.wal.LogDeletes(newTombstoneReader(newStones)); err != nil { return err } for k, v := range newStones { - h.tombstones.stones[k] = h.tombstones.stones[k].add(v[0]) + h.tombstones[k] = h.tombstones[k].add(v[0]) } - h.tombstones = newMapTombstoneReader(h.tombstones.stones) - h.meta.NumTombstones = int64(len(h.tombstones.stones)) + h.meta.NumTombstones = int64(len(h.tombstones)) return nil } @@ -296,7 +294,7 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier { maxt: maxt, index: h.Index(), chunks: h.Chunks(), - tombstones: h.Tombstones().Copy(), + tombstones: h.Tombstones(), postingsMapper: func(p Postings) Postings { ep := make([]uint32, 0, 64) diff --git a/querier.go b/querier.go index 28bf3d21a..49cd013f4 100644 --- a/querier.go +++ b/querier.go @@ -151,7 +151,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { index: q.index, absent: absent, - tombstones: q.tombstones.Copy(), + tombstones: q.tombstones, }, chunks: q.chunks, mint: q.mint, @@ -412,9 +412,9 @@ Outer: s.lset = lset s.chks = chunks - if s.tombstones.Seek(ref) && s.tombstones.At().ref == ref { - s.intervals = s.tombstones.At().intervals + s.intervals = s.tombstones.At(s.p.At()) + if len(s.intervals) > 0 { // Only those chunks that are not entirely deleted. chks := make([]*ChunkMeta, 0, len(s.chks)) for _, chk := range s.chks { diff --git a/querier_test.go b/querier_test.go index d63c36d93..e03570fad 100644 --- a/querier_test.go +++ b/querier_test.go @@ -432,7 +432,7 @@ func TestBlockQuerierDelete(t *testing.T) { chunks [][]sample } - tombstones *mapTombstoneReader + tombstones tombstoneReader queries []query }{ data: []struct { @@ -480,7 +480,7 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: newMapTombstoneReader( + tombstones: newTombstoneReader( map[uint32]intervals{ 1: intervals{{1, 3}}, 2: intervals{{1, 3}, {6, 10}}, @@ -553,7 +553,7 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: cases.tombstones.Copy(), + tombstones: cases.tombstones, mint: c.mint, maxt: c.maxt, diff --git a/tombstones.go b/tombstones.go index ef72b4c28..9e6c39931 100644 --- a/tombstones.go +++ b/tombstones.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "os" "path/filepath" - "sort" ) const tombstoneFilename = "tombstones" @@ -19,11 +18,7 @@ const ( tombstoneFormatV1 = 1 ) -func readTombstoneFile(dir string) (TombstoneReader, error) { - return newTombStoneReader(dir) -} - -func writeTombstoneFile(dir string, tr TombstoneReader) error { +func writeTombstoneFile(dir string, tr tombstoneReader) error { path := filepath.Join(dir, tombstoneFilename) tmp := path + ".tmp" hash := crc32.New(crc32.MakeTable(crc32.Castagnoli)) @@ -48,15 +43,13 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { } pos += int64(n) - for tr.Next() { - s := tr.At() - - refs = append(refs, s.ref) - stoneOff[s.ref] = pos + for k, v := range tr { + refs = append(refs, k) + stoneOff[k] = pos // Write the ranges. buf.reset() - buf.putUvarint(len(s.intervals)) + buf.putUvarint(len(v)) n, err := f.Write(buf.get()) if err != nil { return err @@ -64,7 +57,7 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { pos += int64(n) buf.reset() - for _, r := range s.intervals { + for _, r := range v { buf.putVarint64(r.mint) buf.putVarint64(r.maxt) } @@ -76,9 +69,6 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { } pos += int64(n) } - if err := tr.Err(); err != nil { - return err - } // Write the offset table. // Pad first. @@ -130,25 +120,10 @@ type stone struct { // TombstoneReader is the iterator over tombstones. type TombstoneReader interface { - Next() bool - Seek(ref uint32) bool - At() stone - // Copy copies the current reader state. Changes to the copy will not affect parent. - Copy() TombstoneReader - Err() error + At(ref uint32) intervals } -type tombstoneReader struct { - stones []byte - - cur stone - - b []byte - err error -} - -func newTombStoneReader(dir string) (*tombstoneReader, error) { - // TODO(gouthamve): MMAP? +func readTombstones(dir string) (tombstoneReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if err != nil { return nil, err @@ -173,292 +148,50 @@ func newTombStoneReader(dir string) (*tombstoneReader, error) { } off += 4 // For the numStones which has been read. - return &tombstoneReader{ - stones: b[off : off+int64(numStones*12)], + stones := b[off : off+int64(numStones*12)] + stonesMap := make(map[uint32]intervals) + for len(stones) >= 12 { + d := &decbuf{b: stones[:12]} + ref := d.be32() + off := d.be64int64() - b: b, - }, nil -} - -func (t *tombstoneReader) Next() bool { - if t.err != nil { - return false - } - - if len(t.stones) < 12 { - return false - } - - d := &decbuf{b: t.stones[:12]} - ref := d.be32() - off := d.be64int64() - - d = &decbuf{b: t.b[off:]} - numRanges := d.uvarint() - if err := d.err(); err != nil { - t.err = err - return false - } - - dranges := make(intervals, 0, numRanges) - for i := 0; i < int(numRanges); i++ { - mint := d.varint64() - maxt := d.varint64() + d = &decbuf{b: b[off:]} + numRanges := d.uvarint() if err := d.err(); err != nil { - t.err = err - return false + return nil, err } - dranges = append(dranges, interval{mint, maxt}) - } + dranges := make(intervals, 0, numRanges) + for i := 0; i < int(numRanges); i++ { + mint := d.varint64() + maxt := d.varint64() + if err := d.err(); err != nil { + return nil, err + } - // TODO(gouthamve): Verify checksum. - t.stones = t.stones[12:] - t.cur = stone{ref: ref, intervals: dranges} - return true -} - -func (t *tombstoneReader) Seek(ref uint32) bool { - i := sort.Search(len(t.stones)/12, func(i int) bool { - x := binary.BigEndian.Uint32(t.stones[i*12:]) - return x >= ref - }) - - if i*12 < len(t.stones) { - t.stones = t.stones[i*12:] - return t.Next() - } - - t.stones = nil - return false -} - -func (t *tombstoneReader) At() stone { - return t.cur -} - -func (t *tombstoneReader) Copy() TombstoneReader { - return &tombstoneReader{ - stones: t.stones[:], - cur: t.cur, - - b: t.b, - } -} - -func (t *tombstoneReader) Err() error { - return t.err -} - -type mapTombstoneReader struct { - refs []uint32 - cur uint32 - - stones map[uint32]intervals -} - -func newMapTombstoneReader(ts map[uint32]intervals) *mapTombstoneReader { - refs := make([]uint32, 0, len(ts)) - for k := range ts { - refs = append(refs, k) - } - - sort.Sort(uint32slice(refs)) - return &mapTombstoneReader{stones: ts, refs: refs} -} - -func newEmptyTombstoneReader() *mapTombstoneReader { - return &mapTombstoneReader{stones: make(map[uint32]intervals)} -} - -func (t *mapTombstoneReader) Next() bool { - if len(t.refs) > 0 { - t.cur = t.refs[0] - t.refs = t.refs[1:] - return true - } - - t.cur = 0 - return false -} - -func (t *mapTombstoneReader) Seek(ref uint32) bool { - // If the current value satisfies, then return. - if t.cur >= ref { - return true - } - - // Do binary search between current position and end. - i := sort.Search(len(t.refs), func(i int) bool { - return t.refs[i] >= ref - }) - if i < len(t.refs) { - t.cur = t.refs[i] - t.refs = t.refs[i+1:] - return true - } - t.refs = nil - return false -} - -func (t *mapTombstoneReader) At() stone { - return stone{ref: t.cur, intervals: t.stones[t.cur]} -} - -func (t *mapTombstoneReader) Copy() TombstoneReader { - return &mapTombstoneReader{ - refs: t.refs[:], - cur: t.cur, - - stones: t.stones, - } -} - -func (t *mapTombstoneReader) Err() error { - return nil -} - -type simpleTombstoneReader struct { - refs []uint32 - cur uint32 - - intervals intervals -} - -func newSimpleTombstoneReader(refs []uint32, dranges intervals) *simpleTombstoneReader { - return &simpleTombstoneReader{refs: refs, intervals: dranges} -} - -func (t *simpleTombstoneReader) Next() bool { - if len(t.refs) > 0 { - t.cur = t.refs[0] - t.refs = t.refs[1:] - return true - } - - t.cur = 0 - return false -} - -func (t *simpleTombstoneReader) Seek(ref uint32) bool { - // If the current value satisfies, then return. - if t.cur >= ref { - return true - } - - // Do binary search between current position and end. - i := sort.Search(len(t.refs), func(i int) bool { - return t.refs[i] >= ref - }) - if i < len(t.refs) { - t.cur = t.refs[i] - t.refs = t.refs[i+1:] - return true - } - t.refs = nil - return false -} - -func (t *simpleTombstoneReader) At() stone { - return stone{ref: t.cur, intervals: t.intervals} -} - -func (t *simpleTombstoneReader) Copy() TombstoneReader { - return &simpleTombstoneReader{refs: t.refs[:], cur: t.cur, intervals: t.intervals} -} - -func (t *simpleTombstoneReader) Err() error { - return nil -} - -type mergedTombstoneReader struct { - a, b TombstoneReader - cur stone - - initialized bool - aok, bok bool -} - -func newMergedTombstoneReader(a, b TombstoneReader) *mergedTombstoneReader { - return &mergedTombstoneReader{a: a, b: b} -} - -func (t *mergedTombstoneReader) Next() bool { - if !t.initialized { - t.aok = t.a.Next() - t.bok = t.b.Next() - t.initialized = true - } - - if !t.aok && !t.bok { - return false - } - - if !t.aok { - t.cur = t.b.At() - t.bok = t.b.Next() - return true - } - if !t.bok { - t.cur = t.a.At() - t.aok = t.a.Next() - return true - } - - acur, bcur := t.a.At(), t.b.At() - - if acur.ref < bcur.ref { - t.cur = acur - t.aok = t.a.Next() - } else if acur.ref > bcur.ref { - t.cur = bcur - t.bok = t.b.Next() - } else { - // Merge time ranges. - for _, r := range bcur.intervals { - acur.intervals = acur.intervals.add(r) + dranges = append(dranges, interval{mint, maxt}) } - t.cur = acur - t.aok = t.a.Next() - t.bok = t.b.Next() - } - return true -} - -func (t *mergedTombstoneReader) Seek(ref uint32) bool { - if t.cur.ref >= ref { - return true + // TODO(gouthamve): Verify checksum. + stones = stones[12:] + stonesMap[ref] = dranges } - t.aok = t.a.Seek(ref) - t.bok = t.b.Seek(ref) - t.initialized = true - - return t.Next() -} -func (t *mergedTombstoneReader) At() stone { - return t.cur + return newTombstoneReader(stonesMap), nil } -func (t *mergedTombstoneReader) Copy() TombstoneReader { - return &mergedTombstoneReader{ - a: t.a.Copy(), - b: t.b.Copy(), +type tombstoneReader map[uint32]intervals - cur: t.cur, - - initialized: t.initialized, - aok: t.aok, - bok: t.bok, - } +func newTombstoneReader(ts map[uint32]intervals) tombstoneReader { + return tombstoneReader(ts) } -func (t *mergedTombstoneReader) Err() error { - if t.a.Err() != nil { - return t.a.Err() - } - return t.b.Err() +func newEmptyTombstoneReader() tombstoneReader { + return tombstoneReader(make(map[uint32]intervals)) +} + +func (t tombstoneReader) At(ref uint32) intervals { + return t[ref] } type interval struct { diff --git a/tombstones_test.go b/tombstones_test.go index 1807651c0..525d825f7 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -4,7 +4,6 @@ import ( "io/ioutil" "math/rand" "os" - "sort" "testing" "time" @@ -31,20 +30,13 @@ func TestWriteAndReadbackTombStones(t *testing.T) { stones[ref] = dranges } - require.NoError(t, writeTombstoneFile(tmpdir, newMapTombstoneReader(stones))) + require.NoError(t, writeTombstoneFile(tmpdir, newTombstoneReader(stones))) - restr, err := readTombstoneFile(tmpdir) + restr, err := readTombstones(tmpdir) require.NoError(t, err) - exptr := newMapTombstoneReader(stones) + exptr := newTombstoneReader(stones) // Compare the two readers. - for restr.Next() { - require.True(t, exptr.Next()) - - require.Equal(t, exptr.At(), restr.At()) - } - require.False(t, exptr.Next()) - require.NoError(t, restr.Err()) - require.NoError(t, exptr.Err()) + require.Equal(t, restr, exptr) } func TestAddingNewIntervals(t *testing.T) { @@ -116,218 +108,3 @@ func TestAddingNewIntervals(t *testing.T) { } return } - -func TestTombstoneReadersSeek(t *testing.T) { - // This is assuming that the listPostings is perfect. - table := struct { - m map[uint32]intervals - - cases []uint32 - }{ - m: map[uint32]intervals{ - 2: intervals{{1, 2}}, - 3: intervals{{1, 4}, {5, 6}}, - 4: intervals{{10, 15}, {16, 20}}, - 5: intervals{{1, 4}, {5, 6}}, - 50: intervals{{10, 20}, {35, 50}}, - 600: intervals{{100, 2000}}, - 1000: intervals{}, - 1500: intervals{{10000, 500000}}, - 1600: intervals{{1, 2}, {3, 4}, {4, 5}, {6, 7}}, - }, - - cases: []uint32{1, 10, 20, 40, 30, 20, 50, 599, 601, 1000, 1600, 1601, 2000}, - } - - testFunc := func(t *testing.T, tr TombstoneReader) { - for _, ref := range table.cases { - // Create the listPostings. - refs := make([]uint32, 0, len(table.m)) - for k := range table.m { - refs = append(refs, k) - } - sort.Sort(uint32slice(refs)) - pr := newListPostings(refs) - - // Compare both. - trc := tr.Copy() - require.Equal(t, pr.Seek(ref), trc.Seek(ref)) - if pr.Seek(ref) { - require.Equal(t, pr.At(), trc.At().ref) - require.Equal(t, table.m[pr.At()], trc.At().intervals) - } - - for pr.Next() { - require.True(t, trc.Next()) - require.Equal(t, pr.At(), trc.At().ref) - require.Equal(t, table.m[pr.At()], trc.At().intervals) - } - - require.False(t, trc.Next()) - require.NoError(t, pr.Err()) - require.NoError(t, tr.Err()) - } - } - - t.Run("tombstoneReader", func(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) - - mtr := newMapTombstoneReader(table.m) - writeTombstoneFile(tmpdir, mtr) - tr, err := readTombstoneFile(tmpdir) - require.NoError(t, err) - - testFunc(t, tr) - return - }) - t.Run("mapTombstoneReader", func(t *testing.T) { - mtr := newMapTombstoneReader(table.m) - - testFunc(t, mtr) - return - }) - t.Run("simpleTombstoneReader", func(t *testing.T) { - dranges := intervals{{1, 2}, {3, 4}, {5, 6}} - - for _, ref := range table.cases { - // Create the listPostings. - refs := make([]uint32, 0, len(table.m)) - for k := range table.m { - refs = append(refs, k) - } - sort.Sort(uint32slice(refs)) - pr := newListPostings(refs[:]) - tr := newSimpleTombstoneReader(refs[:], dranges) - - // Compare both. - trc := tr.Copy() - require.Equal(t, pr.Seek(ref), trc.Seek(ref)) - if pr.Seek(ref) { - require.Equal(t, pr.At(), trc.At().ref) - require.Equal(t, dranges, tr.At().intervals) - } - for pr.Next() { - require.True(t, trc.Next()) - require.Equal(t, pr.At(), trc.At().ref, "refs") - require.Equal(t, dranges, trc.At().intervals) - } - - require.False(t, trc.Next()) - require.NoError(t, pr.Err()) - require.NoError(t, tr.Err()) - } - return - }) -} - -func TestMergedTombstoneReader(t *testing.T) { - cases := []struct { - a, b TombstoneReader - - exp TombstoneReader - }{ - { - a: newMapTombstoneReader( - map[uint32]intervals{ - 2: intervals{{1, 2}}, - 3: intervals{{1, 4}, {6, 6}}, - 4: intervals{{10, 15}, {16, 20}}, - 5: intervals{{1, 4}, {5, 6}}, - 50: intervals{{10, 20}, {35, 50}}, - 600: intervals{{100, 2000}}, - 1000: intervals{}, - 1500: intervals{{10000, 500000}}, - 1600: intervals{{1, 2}, {3, 4}, {4, 5}, {6, 7}}, - }, - ), - b: newMapTombstoneReader( - map[uint32]intervals{ - 2: intervals{{1, 2}}, - 3: intervals{{5, 6}}, - 4: intervals{{10, 15}, {16, 20}}, - 5: intervals{{1, 4}, {5, 6}}, - 50: intervals{{10, 20}, {35, 50}}, - 600: intervals{{100, 2000}}, - 1000: intervals{}, - 1500: intervals{{10000, 500000}}, - 1600: intervals{{1, 2}, {3, 4}, {4, 5}, {6, 7}}, - }, - ), - - exp: newMapTombstoneReader( - map[uint32]intervals{ - 2: intervals{{1, 2}}, - 3: intervals{{1, 6}}, - 4: intervals{{10, 20}}, - 5: intervals{{1, 6}}, - 50: intervals{{10, 20}, {35, 50}}, - 600: intervals{{100, 2000}}, - 1000: intervals{}, - 1500: intervals{{10000, 500000}}, - 1600: intervals{{1, 7}}, - }, - ), - }, - { - a: newMapTombstoneReader( - map[uint32]intervals{ - 2: intervals{{1, 2}}, - 3: intervals{{1, 4}, {6, 6}}, - 4: intervals{{10, 15}, {17, 20}}, - 5: intervals{{1, 6}}, - 50: intervals{{10, 20}, {35, 50}}, - 600: intervals{{100, 2000}}, - 1000: intervals{}, - 1500: intervals{{10000, 500000}}, - 1600: intervals{{1, 2}, {3, 4}, {4, 5}, {6, 7}}, - }, - ), - b: newMapTombstoneReader( - map[uint32]intervals{ - 20: intervals{{1, 2}}, - 30: intervals{{1, 4}, {5, 6}}, - 40: intervals{{10, 15}, {16, 20}}, - 60: intervals{{1, 4}, {5, 6}}, - 500: intervals{{10, 20}, {35, 50}}, - 6000: intervals{{100, 2000}}, - 10000: intervals{}, - 15000: intervals{{10000, 500000}}, - 1600: intervals{{1, 2}, {3, 4}, {4, 5}, {6, 7}}, - }, - ), - - exp: newMapTombstoneReader( - map[uint32]intervals{ - 2: intervals{{1, 2}}, - 3: intervals{{1, 4}, {6, 6}}, - 4: intervals{{10, 15}, {17, 20}}, - 5: intervals{{1, 6}}, - 50: intervals{{10, 20}, {35, 50}}, - 600: intervals{{100, 2000}}, - 1000: intervals{}, - 1500: intervals{{10000, 500000}}, - 20: intervals{{1, 2}}, - 30: intervals{{1, 4}, {5, 6}}, - 40: intervals{{10, 15}, {16, 20}}, - 60: intervals{{1, 4}, {5, 6}}, - 500: intervals{{10, 20}, {35, 50}}, - 6000: intervals{{100, 2000}}, - 10000: intervals{}, - 15000: intervals{{10000, 500000}}, - 1600: intervals{{1, 7}}, - }, - ), - }, - } - - for _, c := range cases { - res := newMergedTombstoneReader(c.a, c.b) - for c.exp.Next() { - require.True(t, res.Next()) - require.Equal(t, c.exp.At(), res.At()) - } - require.False(t, res.Next()) - } - return -} diff --git a/wal.go b/wal.go index 0e503e7b8..831a6f7e5 100644 --- a/wal.go +++ b/wal.go @@ -83,7 +83,7 @@ type WAL interface { Reader() WALReader LogSeries([]labels.Labels) error LogSamples([]RefSample) error - LogDeletes(TombstoneReader) error + LogDeletes(tombstoneReader) error Close() error } @@ -180,7 +180,7 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { } // LogDeletes write a batch of new deletes to the log. -func (w *SegmentWAL) LogDeletes(tr TombstoneReader) error { +func (w *SegmentWAL) LogDeletes(tr tombstoneReader) error { if err := w.encodeDeletes(tr); err != nil { return err } @@ -483,17 +483,16 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error { return w.entry(WALEntrySamples, walSamplesSimple, buf) } -func (w *SegmentWAL) encodeDeletes(tr TombstoneReader) error { +func (w *SegmentWAL) encodeDeletes(tr tombstoneReader) error { b := make([]byte, 2*binary.MaxVarintLen64) eb := &encbuf{b: b} buf := getWALBuffer() - for tr.Next() { + for k, v := range tr { eb.reset() - s := tr.At() - eb.putUvarint32(s.ref) - eb.putUvarint(len(s.intervals)) + eb.putUvarint32(k) + eb.putUvarint(len(v)) buf = append(buf, eb.get()...) - for _, itv := range s.intervals { + for _, itv := range v { eb.reset() eb.putVarint64(itv.mint) eb.putVarint64(itv.maxt) diff --git a/wal_test.go b/wal_test.go index 3f622df72..605f3d8e6 100644 --- a/wal_test.go +++ b/wal_test.go @@ -189,7 +189,6 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { return nil } - // TODO: Add this. delf := func(stones []stone) error { if len(stones) > 0 { cstones := make([]stone, len(stones)) @@ -230,7 +229,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { require.NoError(t, w.LogSeries(lbls)) require.NoError(t, w.LogSamples(samples)) - require.NoError(t, w.LogDeletes(newMapTombstoneReader(stones))) + require.NoError(t, w.LogDeletes(newTombstoneReader(stones))) if len(lbls) > 0 { recordedSeries = append(recordedSeries, lbls) @@ -240,12 +239,11 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { totalSamples += len(samples) } if len(stones) > 0 { - tr := newMapTombstoneReader(stones) + tr := newTombstoneReader(stones) newdels := []stone{} - for tr.Next() { - newdels = append(newdels, tr.At()) + for k, v := range tr { + newdels = append(newdels, stone{k, v}) } - require.NoError(t, tr.Err()) recordedDeletes = append(recordedDeletes, newdels) }