diff --git a/db.go b/db.go index 4f10fe9e8..1c56ae7d0 100644 --- a/db.go +++ b/db.go @@ -99,14 +99,14 @@ type DB struct { metrics *dbMetrics opts *Options - // Mutex for that must be held when modifying the general - // block layout. + // Mutex for that must be held when modifying the general block layout. + // cmtx must be held before acquiring it. mtx sync.RWMutex blocks []Block // Mutex that must be held when modifying just the head blocks // or the general layout. - // Must never be held when acquiring a blocks's mutex! + // mtx must be held before acquiring. headmtx sync.RWMutex heads []headBlock @@ -341,6 +341,9 @@ func (db *DB) appendableHeads() (r []headBlock) { } func (db *DB) completedHeads() (r []headBlock) { + db.mtx.RLock() + defer db.mtx.RUnlock() + db.headmtx.RLock() defer db.headmtx.RUnlock() @@ -594,12 +597,12 @@ func (db *DB) EnableCompactions() { // Snapshot writes the current data to the directory. func (db *DB) Snapshot(dir string) error { - db.mtx.Lock() // To block any appenders. - defer db.mtx.Unlock() - db.cmtx.Lock() defer db.cmtx.Unlock() + db.mtx.Lock() // To block any appenders. + defer db.mtx.Unlock() + blocks := db.blocks[:] for _, b := range blocks { db.logger.Log("msg", "snapshotting block", "block", b) @@ -804,6 +807,7 @@ func (a *dbAppender) Rollback() error { func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.cmtx.Lock() defer db.cmtx.Unlock() + db.mtx.Lock() defer db.mtx.Unlock() diff --git a/db_test.go b/db_test.go index e410dfceb..d4b3ca8bb 100644 --- a/db_test.go +++ b/db_test.go @@ -60,10 +60,10 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { require.NoError(t, err) querier := db.Querier(0, 1) - defer querier.Close() seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar"))) require.NoError(t, err) require.Equal(t, seriesSet, map[string][]sample{}) + require.NoError(t, querier.Close()) err = app.Commit() require.NoError(t, err) diff --git a/querier.go b/querier.go index 523a67398..cfe53cd57 100644 --- a/querier.go +++ b/querier.go @@ -53,8 +53,8 @@ type querier struct { blocks []Querier } -// Querier returns a new querier over the data partition for the given -// time range. +// Querier returns a new querier over the data partition for the given time range. +// A goroutine must not handle more than one open Querier. func (s *DB) Querier(mint, maxt int64) Querier { s.mtx.RLock()