Lock shards during querying and shutdown
This commit is contained in:
parent
286293802b
commit
3a5468f251
3
db.go
3
db.go
|
@ -226,6 +226,9 @@ func OpenShard(path string, logger log.Logger) (*Shard, error) {
|
|||
|
||||
// Close the shard.
|
||||
func (s *Shard) Close() error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
var e MultiError
|
||||
|
||||
for _, pb := range s.persisted {
|
||||
|
|
21
querier.go
21
querier.go
|
@ -120,23 +120,33 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
|
|||
}
|
||||
|
||||
func (q *querier) Close() error {
|
||||
return nil
|
||||
var merr MultiError
|
||||
|
||||
for _, sq := range q.shards {
|
||||
merr.Add(sq.Close())
|
||||
}
|
||||
return merr.Err()
|
||||
}
|
||||
|
||||
// shardQuerier aggregates querying results from time blocks within
|
||||
// a single shard.
|
||||
type shardQuerier struct {
|
||||
shard *Shard
|
||||
blocks []Querier
|
||||
}
|
||||
|
||||
// Querier returns a new querier over the data shard for the given
|
||||
// time range.
|
||||
func (s *Shard) Querier(mint, maxt int64) Querier {
|
||||
s.mtx.RLock()
|
||||
|
||||
blocks := s.blocksForInterval(mint, maxt)
|
||||
|
||||
sq := &shardQuerier{
|
||||
blocks: make([]Querier, 0, len(blocks)),
|
||||
shard: s,
|
||||
}
|
||||
|
||||
for _, b := range blocks {
|
||||
sq.blocks = append(sq.blocks, b.Querier(mint, maxt))
|
||||
}
|
||||
|
@ -182,7 +192,14 @@ func (q *shardQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
|||
}
|
||||
|
||||
func (q *shardQuerier) Close() error {
|
||||
return nil
|
||||
var merr MultiError
|
||||
|
||||
for _, bq := range q.blocks {
|
||||
merr.Add(bq.Close())
|
||||
}
|
||||
q.shard.mtx.RUnlock()
|
||||
|
||||
return merr.Err()
|
||||
}
|
||||
|
||||
// blockQuerier provides querying access to a single block database.
|
||||
|
|
Loading…
Reference in New Issue