Fix and document locking order for DB
This commit is contained in:
parent
47afc8e00f
commit
3065be97d8
16
db.go
16
db.go
|
@ -99,14 +99,14 @@ type DB struct {
|
|||
metrics *dbMetrics
|
||||
opts *Options
|
||||
|
||||
// Mutex for that must be held when modifying the general
|
||||
// block layout.
|
||||
// Mutex for that must be held when modifying the general block layout.
|
||||
// cmtx must be held before acquiring it.
|
||||
mtx sync.RWMutex
|
||||
blocks []Block
|
||||
|
||||
// Mutex that must be held when modifying just the head blocks
|
||||
// or the general layout.
|
||||
// Must never be held when acquiring a blocks's mutex!
|
||||
// mtx must be held before acquiring.
|
||||
headmtx sync.RWMutex
|
||||
heads []headBlock
|
||||
|
||||
|
@ -341,6 +341,9 @@ func (db *DB) appendableHeads() (r []headBlock) {
|
|||
}
|
||||
|
||||
func (db *DB) completedHeads() (r []headBlock) {
|
||||
db.mtx.RLock()
|
||||
defer db.mtx.RUnlock()
|
||||
|
||||
db.headmtx.RLock()
|
||||
defer db.headmtx.RUnlock()
|
||||
|
||||
|
@ -594,12 +597,12 @@ func (db *DB) EnableCompactions() {
|
|||
|
||||
// Snapshot writes the current data to the directory.
|
||||
func (db *DB) Snapshot(dir string) error {
|
||||
db.mtx.Lock() // To block any appenders.
|
||||
defer db.mtx.Unlock()
|
||||
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
|
||||
db.mtx.Lock() // To block any appenders.
|
||||
defer db.mtx.Unlock()
|
||||
|
||||
blocks := db.blocks[:]
|
||||
for _, b := range blocks {
|
||||
db.logger.Log("msg", "snapshotting block", "block", b)
|
||||
|
@ -804,6 +807,7 @@ func (a *dbAppender) Rollback() error {
|
|||
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
|
||||
db.mtx.Lock()
|
||||
defer db.mtx.Unlock()
|
||||
|
||||
|
|
|
@ -60,10 +60,10 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
querier := db.Querier(0, 1)
|
||||
defer querier.Close()
|
||||
seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar")))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, seriesSet, map[string][]sample{})
|
||||
require.NoError(t, querier.Close())
|
||||
|
||||
err = app.Commit()
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -53,8 +53,8 @@ type querier struct {
|
|||
blocks []Querier
|
||||
}
|
||||
|
||||
// Querier returns a new querier over the data partition for the given
|
||||
// time range.
|
||||
// Querier returns a new querier over the data partition for the given time range.
|
||||
// A goroutine must not handle more than one open Querier.
|
||||
func (s *DB) Querier(mint, maxt int64) Querier {
|
||||
s.mtx.RLock()
|
||||
|
||||
|
|
Loading…
Reference in New Issue