Use a *mapTombstoneReader instead of map
We need to recalculate the sorted ref list everytime we make a Tombstones() call. This avoids that. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
parent
22c1b5b492
commit
45d3db4e9e
10
block.go
10
block.go
|
@ -163,8 +163,7 @@ type persistedBlock struct {
|
||||||
indexr *indexReader
|
indexr *indexReader
|
||||||
|
|
||||||
// For tombstones.
|
// For tombstones.
|
||||||
stones []uint32
|
tombstones *mapTombstoneReader
|
||||||
tombstones map[uint32][]trange
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPersistedBlock(dir string) (*persistedBlock, error) {
|
func newPersistedBlock(dir string) (*persistedBlock, error) {
|
||||||
|
@ -186,6 +185,7 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := make(map[uint32][]trange)
|
ts := make(map[uint32][]trange)
|
||||||
for tr.Next() {
|
for tr.Next() {
|
||||||
s := tr.At()
|
s := tr.At()
|
||||||
|
@ -198,7 +198,7 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
|
||||||
chunkr: cr,
|
chunkr: cr,
|
||||||
indexr: ir,
|
indexr: ir,
|
||||||
|
|
||||||
tombstones: ts,
|
tombstones: newMapTombstoneReader(ts),
|
||||||
}
|
}
|
||||||
return pb, nil
|
return pb, nil
|
||||||
}
|
}
|
||||||
|
@ -229,7 +229,7 @@ func (pb *persistedBlock) Dir() string { return pb.dir }
|
||||||
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
|
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
|
||||||
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
|
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
|
||||||
func (pb *persistedBlock) Tombstones() TombstoneReader {
|
func (pb *persistedBlock) Tombstones() TombstoneReader {
|
||||||
return newMapTombstoneReader(pb.tombstones)
|
return pb.tombstones.Copy()
|
||||||
}
|
}
|
||||||
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
|
func (pb *persistedBlock) Meta() BlockMeta { return pb.meta }
|
||||||
|
|
||||||
|
@ -270,7 +270,7 @@ Outer:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge the current and new tombstones.
|
// Merge the current and new tombstones.
|
||||||
tr := newMapTombstoneReader(pb.tombstones)
|
tr := pb.tombstones.Copy()
|
||||||
str := newSimpleTombstoneReader(vPostings, []trange{{mint, maxt}})
|
str := newSimpleTombstoneReader(vPostings, []trange{{mint, maxt}})
|
||||||
tombreader := newMergedTombstoneReader(tr, str)
|
tombreader := newMergedTombstoneReader(tr, str)
|
||||||
|
|
||||||
|
|
21
head.go
21
head.go
|
@ -66,7 +66,7 @@ type HeadBlock struct {
|
||||||
values map[string]stringset // label names to possible values
|
values map[string]stringset // label names to possible values
|
||||||
postings *memPostings // postings lists for terms
|
postings *memPostings // postings lists for terms
|
||||||
|
|
||||||
tombstones map[uint32][]trange
|
tombstones *mapTombstoneReader
|
||||||
|
|
||||||
meta BlockMeta
|
meta BlockMeta
|
||||||
}
|
}
|
||||||
|
@ -120,7 +120,7 @@ func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
|
||||||
values: map[string]stringset{},
|
values: map[string]stringset{},
|
||||||
postings: &memPostings{m: make(map[term][]uint32)},
|
postings: &memPostings{m: make(map[term][]uint32)},
|
||||||
meta: *meta,
|
meta: *meta,
|
||||||
tombstones: make(map[uint32][]trange),
|
tombstones: emptyTombstoneReader,
|
||||||
}
|
}
|
||||||
return h, h.init()
|
return h, h.init()
|
||||||
}
|
}
|
||||||
|
@ -158,7 +158,8 @@ func (h *HeadBlock) init() error {
|
||||||
|
|
||||||
for tr.Next() {
|
for tr.Next() {
|
||||||
s := tr.At()
|
s := tr.At()
|
||||||
h.tombstones[s.ref] = s.ranges
|
h.tombstones.refs = append(h.tombstones.refs, s.ref)
|
||||||
|
h.tombstones.stones[s.ref] = s.ranges
|
||||||
}
|
}
|
||||||
return errors.Wrap(err, "tombstones reader iteration")
|
return errors.Wrap(err, "tombstones reader iteration")
|
||||||
}
|
}
|
||||||
|
@ -229,23 +230,19 @@ func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} }
|
||||||
|
|
||||||
// Tombstones implements headBlock.
|
// Tombstones implements headBlock.
|
||||||
func (h *HeadBlock) Tombstones() TombstoneReader {
|
func (h *HeadBlock) Tombstones() TombstoneReader {
|
||||||
return newMapTombstoneReader(h.tombstones)
|
return h.tombstones.Copy()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete implements headBlock.
|
// Delete implements headBlock.
|
||||||
func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error {
|
func (h *HeadBlock) Delete(mint int64, maxt int64, ms ...labels.Matcher) error {
|
||||||
h.mtx.RLock()
|
h.mtx.Lock() // We are modifying the tombstones here.
|
||||||
|
defer h.mtx.Unlock()
|
||||||
|
|
||||||
ir := h.Index()
|
ir := h.Index()
|
||||||
|
|
||||||
pr := newPostingsReader(ir)
|
pr := newPostingsReader(ir)
|
||||||
p, absent := pr.Select(ms...)
|
p, absent := pr.Select(ms...)
|
||||||
|
|
||||||
h.mtx.RUnlock()
|
|
||||||
|
|
||||||
h.mtx.Lock() // We are modifying the tombstones here.
|
|
||||||
defer h.mtx.Unlock()
|
|
||||||
|
|
||||||
Outer:
|
Outer:
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
ref := p.At()
|
ref := p.At()
|
||||||
|
@ -256,14 +253,14 @@ Outer:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
h.tombstones[ref] = addNewInterval(h.tombstones[ref], trange{mint, maxt})
|
h.tombstones.stones[ref] = addNewInterval(h.tombstones.stones[ref], trange{mint, maxt})
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Err() != nil {
|
if p.Err() != nil {
|
||||||
return p.Err()
|
return p.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
return writeTombstoneFile(h.dir, newMapTombstoneReader(h.tombstones))
|
return writeTombstoneFile(h.dir, newMapTombstoneReader(h.tombstones.stones))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Querier implements Queryable and headBlock.
|
// Querier implements Queryable and headBlock.
|
||||||
|
|
|
@ -218,7 +218,6 @@ type mapTombstoneReader struct {
|
||||||
stones map[uint32][]trange
|
stones map[uint32][]trange
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(gouthamve): Take pre-sorted refs.
|
|
||||||
func newMapTombstoneReader(ts map[uint32][]trange) *mapTombstoneReader {
|
func newMapTombstoneReader(ts map[uint32][]trange) *mapTombstoneReader {
|
||||||
refs := make([]uint32, 0, len(ts))
|
refs := make([]uint32, 0, len(ts))
|
||||||
for k := range ts {
|
for k := range ts {
|
||||||
|
|
Loading…
Reference in New Issue