diff --git a/db.go b/db.go index ff1763762..72086fff7 100644 --- a/db.go +++ b/db.go @@ -448,9 +448,6 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - var cs []io.Closer - defer func() { closeAll(cs...) }() - dirs, err := blockDirs(db.dir) if err != nil { return errors.Wrap(err, "find blocks") @@ -482,25 +479,25 @@ func (db *DB) reload() (err error) { return errors.Wrap(err, "invalid block sequence") } - // Close all opened blocks that no longer exist after we returned all locks. - // TODO(fabxc: probably races with querier still reading from them. Can - // we just abandon them and have the open FDs be GC'd automatically eventually? - for _, b := range db.blocks { - if _, ok := exist[b.Meta().ULID]; !ok { - cs = append(cs, b) - } - } - + // Swap in new blocks first for subsequently created readers to be seen. + // Then close previous blocks, which may block for pending readers to complete. db.mtx.Lock() + oldBlocks := db.blocks db.blocks = blocks db.mtx.Unlock() + for _, b := range oldBlocks { + if _, ok := exist[b.Meta().ULID]; !ok { + b.Close() + } + } + // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. if len(blocks) == 0 { return nil } - maxt := blocks[len(db.blocks)-1].Meta().MaxTime + maxt := blocks[len(blocks)-1].Meta().MaxTime return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } @@ -593,7 +590,10 @@ func (db *DB) Snapshot(dir string) error { db.cmtx.Lock() defer db.cmtx.Unlock() - for _, b := range db.Blocks() { + db.mtx.RLock() + defer db.mtx.RUnlock() + + for _, b := range db.blocks { level.Info(db.logger).Log("msg", "snapshotting block", "block", b) if err := b.Snapshot(dir); err != nil { @@ -608,7 +608,10 @@ func (db *DB) Snapshot(dir string) error { func (db *DB) Querier(mint, maxt int64) (Querier, error) { var blocks []BlockReader - for _, b := range db.Blocks() { + db.mtx.RLock() + defer db.mtx.RUnlock() + + for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { blocks = append(blocks, b) @@ -623,10 +626,15 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { } for _, b := range blocks { q, err := NewBlockQuerier(b, mint, maxt) - if err != nil { - return nil, errors.Wrapf(err, "open querier for block %s", b) + if err == nil { + sq.blocks = append(sq.blocks, q) + continue } - sq.blocks = append(sq.blocks, q) + // If we fail, all previously opened queriers must be closed. + for _, q := range sq.blocks { + q.Close() + } + return nil, errors.Wrapf(err, "open querier for block %s", b) } return sq, nil } @@ -643,7 +651,10 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { var g errgroup.Group - for _, b := range db.Blocks() { + db.mtx.RLock() + defer db.mtx.RUnlock() + + for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { g.Go(func(b *Block) func() error { diff --git a/querier.go b/querier.go index 12e05e79d..97b98ea77 100644 --- a/querier.go +++ b/querier.go @@ -114,10 +114,13 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { } chunkr, err := b.Chunks() if err != nil { + indexr.Close() return nil, errors.Wrapf(err, "open chunk reader") } tombsr, err := b.Tombstones() if err != nil { + indexr.Close() + chunkr.Close() return nil, errors.Wrapf(err, "open tombstone reader") } return &blockQuerier{