mirror of
https://github.com/prometheus/prometheus
synced 2024-12-26 00:23:18 +00:00
Close previous block queriers on error
This ensures we close all previously opened queriers if on of the block querier fails to open. Also swap in new blocks before closing old ones to avoid the situation in general. Make read locking of blocks more conservative to avoid unnecessary retries by clients, e.g. when blocks are getting closed before we can successfully instantiate querier against them.
This commit is contained in:
parent
9749aa2a3e
commit
f8e88bfdb7
49
db.go
49
db.go
@ -448,9 +448,6 @@ func (db *DB) reload() (err error) {
|
||||
db.metrics.reloads.Inc()
|
||||
}()
|
||||
|
||||
var cs []io.Closer
|
||||
defer func() { closeAll(cs...) }()
|
||||
|
||||
dirs, err := blockDirs(db.dir)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "find blocks")
|
||||
@ -482,25 +479,25 @@ func (db *DB) reload() (err error) {
|
||||
return errors.Wrap(err, "invalid block sequence")
|
||||
}
|
||||
|
||||
// Close all opened blocks that no longer exist after we returned all locks.
|
||||
// TODO(fabxc: probably races with querier still reading from them. Can
|
||||
// we just abandon them and have the open FDs be GC'd automatically eventually?
|
||||
for _, b := range db.blocks {
|
||||
if _, ok := exist[b.Meta().ULID]; !ok {
|
||||
cs = append(cs, b)
|
||||
}
|
||||
}
|
||||
|
||||
// Swap in new blocks first for subsequently created readers to be seen.
|
||||
// Then close previous blocks, which may block for pending readers to complete.
|
||||
db.mtx.Lock()
|
||||
oldBlocks := db.blocks
|
||||
db.blocks = blocks
|
||||
db.mtx.Unlock()
|
||||
|
||||
for _, b := range oldBlocks {
|
||||
if _, ok := exist[b.Meta().ULID]; !ok {
|
||||
b.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Garbage collect data in the head if the most recent persisted block
|
||||
// covers data of its current time range.
|
||||
if len(blocks) == 0 {
|
||||
return nil
|
||||
}
|
||||
maxt := blocks[len(db.blocks)-1].Meta().MaxTime
|
||||
maxt := blocks[len(blocks)-1].Meta().MaxTime
|
||||
|
||||
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
|
||||
}
|
||||
@ -593,7 +590,10 @@ func (db *DB) Snapshot(dir string) error {
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
|
||||
for _, b := range db.Blocks() {
|
||||
db.mtx.RLock()
|
||||
defer db.mtx.RUnlock()
|
||||
|
||||
for _, b := range db.blocks {
|
||||
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
|
||||
|
||||
if err := b.Snapshot(dir); err != nil {
|
||||
@ -608,7 +608,10 @@ func (db *DB) Snapshot(dir string) error {
|
||||
func (db *DB) Querier(mint, maxt int64) (Querier, error) {
|
||||
var blocks []BlockReader
|
||||
|
||||
for _, b := range db.Blocks() {
|
||||
db.mtx.RLock()
|
||||
defer db.mtx.RUnlock()
|
||||
|
||||
for _, b := range db.blocks {
|
||||
m := b.Meta()
|
||||
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
|
||||
blocks = append(blocks, b)
|
||||
@ -623,10 +626,15 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
|
||||
}
|
||||
for _, b := range blocks {
|
||||
q, err := NewBlockQuerier(b, mint, maxt)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "open querier for block %s", b)
|
||||
if err == nil {
|
||||
sq.blocks = append(sq.blocks, q)
|
||||
continue
|
||||
}
|
||||
sq.blocks = append(sq.blocks, q)
|
||||
// If we fail, all previously opened queriers must be closed.
|
||||
for _, q := range sq.blocks {
|
||||
q.Close()
|
||||
}
|
||||
return nil, errors.Wrapf(err, "open querier for block %s", b)
|
||||
}
|
||||
return sq, nil
|
||||
}
|
||||
@ -643,7 +651,10 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||
|
||||
var g errgroup.Group
|
||||
|
||||
for _, b := range db.Blocks() {
|
||||
db.mtx.RLock()
|
||||
defer db.mtx.RUnlock()
|
||||
|
||||
for _, b := range db.blocks {
|
||||
m := b.Meta()
|
||||
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
|
||||
g.Go(func(b *Block) func() error {
|
||||
|
@ -114,10 +114,13 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) {
|
||||
}
|
||||
chunkr, err := b.Chunks()
|
||||
if err != nil {
|
||||
indexr.Close()
|
||||
return nil, errors.Wrapf(err, "open chunk reader")
|
||||
}
|
||||
tombsr, err := b.Tombstones()
|
||||
if err != nil {
|
||||
indexr.Close()
|
||||
chunkr.Close()
|
||||
return nil, errors.Wrapf(err, "open tombstone reader")
|
||||
}
|
||||
return &blockQuerier{
|
||||
|
Loading…
Reference in New Issue
Block a user