From 65b846ae5b1ed531901db0b2804a61a4adcf3b1b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 17 Mar 2017 12:12:50 +0100 Subject: [PATCH] Remove unreturned locks, detect writes on closed heads --- db.go | 3 +-- head.go | 10 ++++++++-- querier.go | 15 ++++++--------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/db.go b/db.go index 7124ce106..9ddad0d5c 100644 --- a/db.go +++ b/db.go @@ -401,9 +401,8 @@ func (db *DB) Close() error { close(db.stopc) <-db.donec - // Lock mutex and leave it locked so we panic if there's a bug causing - // the block to be used afterwards. db.mtx.Lock() + defer db.mtx.Unlock() var g errgroup.Group diff --git a/head.go b/head.go index 43290a877..eed2ff222 100644 --- a/head.go +++ b/head.go @@ -42,6 +42,7 @@ type headBlock struct { wal *WAL activeWriters uint64 + closed bool // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. @@ -137,9 +138,8 @@ func (h *headBlock) inBounds(t int64) bool { // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { - // Lock mutex and leave it locked so we panic if there's a bug causing - // the block to be used afterwards. h.mtx.Lock() + defer h.mtx.Unlock() if err := h.wal.Close(); err != nil { return errors.Wrapf(err, "close WAL for head %s", h.dir) @@ -156,6 +156,8 @@ func (h *headBlock) Close() error { if meta.ULID == h.meta.ULID { return writeMetaFile(h.dir, &h.meta) } + + h.closed = true return nil } @@ -175,6 +177,10 @@ func (h *headBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) h.mtx.RLock() + + if h.closed { + panic(fmt.Sprintf("block %s already closed", h.dir)) + } return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} } diff --git a/querier.go b/querier.go index 3c42baba3..c027a8dc0 100644 --- a/querier.go +++ b/querier.go @@ -66,6 +66,12 @@ func (s *DB) Querier(mint, maxt int64) Querier { // TODO(fabxc): find nicer solution. if hb, ok := b.(*headBlock); ok { + // TODO(fabxc): temporary refactored. + hb.mtx.RLock() + if hb.closed { + panic(fmt.Sprintf("block %s already closed", hb.dir)) + } + hb.mtx.RUnlock() q.postingsMapper = hb.remapPostings } @@ -135,15 +141,6 @@ type blockQuerier struct { mint, maxt int64 } -func newBlockQuerier(ix IndexReader, c ChunkReader, mint, maxt int64) *blockQuerier { - return &blockQuerier{ - mint: mint, - maxt: maxt, - index: ix, - chunks: c, - } -} - func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { var ( its []Postings