Actually close olds blocks in reloadBlocks

This fixes a bug leaking memory because blocks were not actually closed
as the closing call references the initial, empty slice
This commit is contained in:
Fabian Reinartz 2017-03-23 18:27:20 +01:00
parent 70909ca8ad
commit e478d0e3bc
2 changed files with 27 additions and 12 deletions

4
db.go
View File

@ -366,7 +366,7 @@ func (db *DB) seqBlock(i int) (Block, bool) {
func (db *DB) reloadBlocks() error {
var cs []io.Closer
defer closeAll(cs...)
defer func() { closeAll(cs...) }()
db.mtx.Lock()
defer db.mtx.Unlock()
@ -423,7 +423,7 @@ func (db *DB) reloadBlocks() error {
// Close all blocks that we no longer need. They are closed after returning all
// locks to avoid questionable locking order.
for _, b := range db.blocks {
if nb := seqBlocks[b.Meta().Sequence]; nb != b {
if nb, ok := seqBlocks[b.Meta().Sequence]; !ok || nb != b {
cs = append(cs, b)
}
}

33
head.go
View File

@ -53,7 +53,6 @@ type headBlock struct {
values map[string]stringset // label names to possible values
postings *memPostings // postings lists for terms
metamtx sync.RWMutex
meta BlockMeta
}
@ -109,6 +108,7 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
r := wal.Reader()
Outer:
for r.Next() {
series, samples := r.At()
@ -117,6 +117,10 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
h.meta.Stats.NumSeries++
}
for _, s := range samples {
if int(s.ref) >= len(h.series) {
l.Log("msg", "unknown series reference, abort WAL restore", "got", s.ref, "max", len(h.series)-1)
break Outer
}
h.series[s.ref].append(s.t, s.v)
if !h.inBounds(s.t) {
@ -168,10 +172,19 @@ func (h *headBlock) Close() error {
}
func (h *headBlock) Meta() BlockMeta {
h.metamtx.RLock()
defer h.metamtx.RUnlock()
m := BlockMeta{
ULID: h.meta.ULID,
Sequence: h.meta.Sequence,
MinTime: h.meta.MinTime,
MaxTime: h.meta.MaxTime,
Compaction: h.meta.Compaction,
}
return h.meta
m.Stats.NumChunks = atomic.LoadUint64(&h.meta.Stats.NumChunks)
m.Stats.NumSeries = atomic.LoadUint64(&h.meta.Stats.NumSeries)
m.Stats.NumSamples = atomic.LoadUint64(&h.meta.Stats.NumSamples)
return m
}
func (h *headBlock) Dir() string { return h.dir }
@ -199,6 +212,11 @@ func (h *headBlock) Querier(mint, maxt int64) Querier {
ep := make([]uint32, 0, 64)
for p.Next() {
// Skip posting entries that include series added after we
// instantiated the querier.
if int(p.At()) >= len(series) {
break
}
ep = append(ep, p.At())
}
if err := p.Err(); err != nil {
@ -413,11 +431,8 @@ func (a *headAppender) Commit() error {
a.mtx.RUnlock()
a.metamtx.Lock()
defer a.metamtx.Unlock()
a.meta.Stats.NumSamples += total
a.meta.Stats.NumSeries += uint64(len(a.newSeries))
atomic.AddUint64(&a.meta.Stats.NumSamples, total)
atomic.AddUint64(&a.meta.Stats.NumSeries, uint64(len(a.newSeries)))
return nil
}