diff --git a/block.go b/block.go index 7d4ad9365..7081db077 100644 --- a/block.go +++ b/block.go @@ -457,7 +457,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := NewMemTombstones() + stones := newMemTombstones() var lset labels.Labels var chks []chunks.Meta @@ -565,7 +565,7 @@ func (pb *Block) Snapshot(dir string) error { return nil } -// Returns true if the block overlaps [mint, maxt]. +// OverlapsClosedInterval returns true if the block overlaps [mint, maxt]. func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool { // The block itself is a half-open interval // [pb.meta.MinTime, pb.meta.MaxTime). diff --git a/block_test.go b/block_test.go index c95bcd17d..61666fe38 100644 --- a/block_test.go +++ b/block_test.go @@ -70,7 +70,7 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block { testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777)) - testutil.Ok(t, writeTombstoneFile(dir, NewMemTombstones())) + testutil.Ok(t, writeTombstoneFile(dir, newMemTombstones())) b, err := OpenBlock(dir, nil) testutil.Ok(t, err) diff --git a/compact.go b/compact.go index 0439ca310..ca027ad20 100644 --- a/compact.go +++ b/compact.go @@ -489,7 +489,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil { + if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } diff --git a/db_test.go b/db_test.go index 1996c6e8d..014a126c2 100644 --- a/db_test.go +++ b/db_test.go @@ -782,7 +782,7 @@ func TestTombstoneClean(t *testing.T) { } for _, b := range db.Blocks() { - testutil.Equals(t, NewMemTombstones(), b.tombstones) + testutil.Equals(t, newMemTombstones(), b.tombstones) } } } @@ -811,7 +811,7 @@ func TestTombstoneCleanFail(t *testing.T) { block := createEmptyBlock(t, blockDir, meta) // Add some some fake tombstones to trigger the compaction. - tomb := NewMemTombstones() + tomb := newMemTombstones() tomb.addInterval(0, Interval{0, 1}) block.tombstones = tomb diff --git a/head.go b/head.go index 30ab05b29..2adda313d 100644 --- a/head.go +++ b/head.go @@ -225,7 +225,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), - tombstones: NewMemTombstones(), + tombstones: newMemTombstones(), } h.metrics = newHeadMetrics(h, r) diff --git a/head_test.go b/head_test.go index ea7d18c9b..7a68f4fce 100644 --- a/head_test.go +++ b/head_test.go @@ -338,7 +338,7 @@ func TestHeadDeleteSimple(t *testing.T) { Outer: for _, c := range cases { // Reset the tombstones. - head.tombstones = NewMemTombstones() + head.tombstones = newMemTombstones() // Delete the ranges. for _, r := range c.intervals { @@ -521,7 +521,7 @@ func TestDelete_e2e(t *testing.T) { } for _, del := range dels { // Reset the deletes everytime. - hb.tombstones = NewMemTombstones() + hb.tombstones = newMemTombstones() for _, r := range del.drange { testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) } diff --git a/mocks_test.go b/mocks_test.go index ab796f8b5..ee0cb9731 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -70,4 +70,4 @@ type mockBReader struct { func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil } func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } -func (r *mockBReader) Tombstones() (TombstoneReader, error) { return NewMemTombstones(), nil } +func (r *mockBReader) Tombstones() (TombstoneReader, error) { return newMemTombstones(), nil } diff --git a/querier.go b/querier.go index a4da8e689..e49ab9c00 100644 --- a/querier.go +++ b/querier.go @@ -478,7 +478,7 @@ type baseChunkSeries struct { // over them. It drops chunks based on tombstones in the given reader. func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { - tr = NewMemTombstones() + tr = newMemTombstones() } p, err := PostingsForMatchers(ir, ms...) if err != nil { diff --git a/querier_test.go b/querier_test.go index 3abcb847b..2eb5b037a 100644 --- a/querier_test.go +++ b/querier_test.go @@ -483,7 +483,7 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: NewMemTombstones(), + tombstones: newMemTombstones(), mint: c.mint, maxt: c.maxt, @@ -756,7 +756,7 @@ func TestBaseChunkSeries(t *testing.T) { bcs := &baseChunkSeries{ p: index.NewListPostings(tc.postings), index: mi, - tombstones: NewMemTombstones(), + tombstones: newMemTombstones(), } i := 0 diff --git a/record.go b/record.go index c8cc7a504..364e8144d 100644 --- a/record.go +++ b/record.go @@ -26,22 +26,16 @@ import ( type RecordType uint8 const ( - RecordInvalid RecordType = 255 - RecordSeries RecordType = 1 - RecordSamples RecordType = 2 + // RecordInvalid is returned for unrecognised WAL record types. + RecordInvalid RecordType = 255 + // RecordSeries is used to match WAL records of type Series. + RecordSeries RecordType = 1 + // RecordSamples is used to match WAL records of type Samples. + RecordSamples RecordType = 2 + // RecordTombstones is used to match WAL records of type Tombstones. RecordTombstones RecordType = 3 ) -type RecordLogger interface { - Log(recs ...[]byte) error -} - -type RecordReader interface { - Next() bool - Err() error - Record() []byte -} - // RecordDecoder decodes series, sample, and tombstone records. // The zero value is ready to use. type RecordDecoder struct { diff --git a/tombstones.go b/tombstones.go index 21afd046a..a1f30b59c 100644 --- a/tombstones.go +++ b/tombstones.go @@ -113,10 +113,10 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (*memTombstones, error) { +func readTombstones(dir string) (TombstoneReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return NewMemTombstones(), nil + return newMemTombstones(), nil } else if err != nil { return nil, err } @@ -146,7 +146,7 @@ func readTombstones(dir string) (*memTombstones, error) { return nil, errors.New("checksum did not match") } - stonesMap := NewMemTombstones() + stonesMap := newMemTombstones() for d.len() > 0 { k := d.uvarint64() @@ -167,7 +167,9 @@ type memTombstones struct { mtx sync.RWMutex } -func NewMemTombstones() *memTombstones { +// newMemTombstones creates new in memory TombstoneReader +// that allows adding new intervals. +func newMemTombstones() *memTombstones { return &memTombstones{intvlGroups: make(map[uint64]Intervals)} } @@ -208,7 +210,7 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } -func (memTombstones) Close() error { +func (*memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index 62bc06818..e12574f11 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -30,7 +30,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { ref := uint64(0) - stones := NewMemTombstones() + stones := newMemTombstones() // Generate the tombstones. for i := 0; i < 100; i++ { ref += uint64(rand.Int31n(10)) + 1 @@ -125,7 +125,7 @@ func TestAddingNewIntervals(t *testing.T) { // TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines. func TestMemTombstonesConcurrency(t *testing.T) { - tomb := NewMemTombstones() + tomb := newMemTombstones() totalRuns := 100 var wg sync.WaitGroup wg.Add(2)