From f8e88bfdb75946479149969a4ad92caf0fc80225 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 23 Oct 2017 20:30:03 +0200 Subject: [PATCH] Close previous block queriers on error This ensures we close all previously opened queriers if on of the block querier fails to open. Also swap in new blocks before closing old ones to avoid the situation in general. Make read locking of blocks more conservative to avoid unnecessary retries by clients, e.g. when blocks are getting closed before we can successfully instantiate querier against them. --- db.go | 49 ++++++++++++++++++++++++++++++------------------- querier.go | 3 +++ 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/db.go b/db.go index ff1763762..72086fff7 100644 --- a/db.go +++ b/db.go @@ -448,9 +448,6 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - var cs []io.Closer - defer func() { closeAll(cs...) }() - dirs, err := blockDirs(db.dir) if err != nil { return errors.Wrap(err, "find blocks") @@ -482,25 +479,25 @@ func (db *DB) reload() (err error) { return errors.Wrap(err, "invalid block sequence") } - // Close all opened blocks that no longer exist after we returned all locks. - // TODO(fabxc: probably races with querier still reading from them. Can - // we just abandon them and have the open FDs be GC'd automatically eventually? - for _, b := range db.blocks { - if _, ok := exist[b.Meta().ULID]; !ok { - cs = append(cs, b) - } - } - + // Swap in new blocks first for subsequently created readers to be seen. + // Then close previous blocks, which may block for pending readers to complete. db.mtx.Lock() + oldBlocks := db.blocks db.blocks = blocks db.mtx.Unlock() + for _, b := range oldBlocks { + if _, ok := exist[b.Meta().ULID]; !ok { + b.Close() + } + } + // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. if len(blocks) == 0 { return nil } - maxt := blocks[len(db.blocks)-1].Meta().MaxTime + maxt := blocks[len(blocks)-1].Meta().MaxTime return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } @@ -593,7 +590,10 @@ func (db *DB) Snapshot(dir string) error { db.cmtx.Lock() defer db.cmtx.Unlock() - for _, b := range db.Blocks() { + db.mtx.RLock() + defer db.mtx.RUnlock() + + for _, b := range db.blocks { level.Info(db.logger).Log("msg", "snapshotting block", "block", b) if err := b.Snapshot(dir); err != nil { @@ -608,7 +608,10 @@ func (db *DB) Snapshot(dir string) error { func (db *DB) Querier(mint, maxt int64) (Querier, error) { var blocks []BlockReader - for _, b := range db.Blocks() { + db.mtx.RLock() + defer db.mtx.RUnlock() + + for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { blocks = append(blocks, b) @@ -623,10 +626,15 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { } for _, b := range blocks { q, err := NewBlockQuerier(b, mint, maxt) - if err != nil { - return nil, errors.Wrapf(err, "open querier for block %s", b) + if err == nil { + sq.blocks = append(sq.blocks, q) + continue } - sq.blocks = append(sq.blocks, q) + // If we fail, all previously opened queriers must be closed. + for _, q := range sq.blocks { + q.Close() + } + return nil, errors.Wrapf(err, "open querier for block %s", b) } return sq, nil } @@ -643,7 +651,10 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { var g errgroup.Group - for _, b := range db.Blocks() { + db.mtx.RLock() + defer db.mtx.RUnlock() + + for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { g.Go(func(b *Block) func() error { diff --git a/querier.go b/querier.go index 12e05e79d..97b98ea77 100644 --- a/querier.go +++ b/querier.go @@ -114,10 +114,13 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { } chunkr, err := b.Chunks() if err != nil { + indexr.Close() return nil, errors.Wrapf(err, "open chunk reader") } tombsr, err := b.Tombstones() if err != nil { + indexr.Close() + chunkr.Close() return nil, errors.Wrapf(err, "open tombstone reader") } return &blockQuerier{