diff --git a/db.go b/db.go index eb640b103..b1a3a2421 100644 --- a/db.go +++ b/db.go @@ -87,17 +87,24 @@ const sep = '\xff' // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { - dir string - lockf lockfile.Lockfile + dir string + lockf lockfile.Lockfile + logger log.Logger metrics *dbMetrics opts *Options + // Mutex for that must be held when modifying the general + // block layout. mtx sync.RWMutex persisted []*persistedBlock - heads []*headBlock seqBlocks map[int]Block - headGen uint8 + + // Mutex that must be held when modifying just the head blocks + // or the general layout. + headmtx sync.RWMutex + heads []*headBlock + headGen uint8 compactor Compactor @@ -200,7 +207,15 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - if err := db.compact(); err != nil { + var merr MultiError + + changes, err := db.compact() + merr.Add(err) + + if changes { + merr.Add(db.reloadBlocks()) + } + if err := merr.Err(); err != nil { db.logger.Log("msg", "compaction failed", "err", err) } @@ -210,12 +225,17 @@ func (db *DB) run() { } } -func (db *DB) compact() error { - changes := false +func (db *DB) compact() (changes bool, err error) { // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. - db.mtx.RLock() + db.headmtx.RLock() + var singles []*headBlock + + // Collect head blocks that are ready for compaction. Write them after + // returning the lock to not block Appenders. + // Selected blocks are semantically ensured to not be written to afterwards + // by appendable(). if len(db.heads) > db.opts.AppendableBlocks { for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { // Blocks that won't be appendable when instantiating a new appender @@ -224,36 +244,38 @@ func (db *DB) compact() error { if atomic.LoadUint64(&h.activeWriters) > 0 { break } - - db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) - - select { - case <-db.stopc: - db.mtx.RUnlock() - return nil - default: - } - - if err := db.compactor.Write(h.Dir(), h); err != nil { - db.mtx.RUnlock() - return errors.Wrap(err, "persist head block") - } - changes = true + singles = append(singles, h) } } - db.mtx.RUnlock() + db.headmtx.RUnlock() + +Loop: + for _, h := range singles { + db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) + + select { + case <-db.stopc: + break Loop + default: + } + + if err = db.compactor.Write(h.Dir(), h); err != nil { + return changes, errors.Wrap(err, "persist head block") + } + changes = true + } // Check for compactions of multiple blocks. for { plans, err := db.compactor.Plan(db.dir) if err != nil { - return errors.Wrap(err, "plan compaction") + return changes, errors.Wrap(err, "plan compaction") } select { case <-db.stopc: - return nil + return false, nil default: } // We just execute compactions sequentially to not cause too extreme @@ -264,7 +286,7 @@ func (db *DB) compact() error { db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p)) if err := db.compactor.Compact(p...); err != nil { - return errors.Wrapf(err, "compact %s", p) + return changes, errors.Wrapf(err, "compact %s", p) } changes = true } @@ -274,10 +296,7 @@ func (db *DB) compact() error { } } - if changes { - return errors.Wrap(db.reloadBlocks(), "reload blocks") - } - return nil + return changes, nil } // func (db *DB) retentionCutoff() error { @@ -381,10 +400,11 @@ func (db *DB) Close() error { close(db.stopc) <-db.donec - var merr MultiError - + // Lock mutex and leave it locked so we panic if there's a bug causing + // the block to be used afterwards. db.mtx.Lock() - defer db.mtx.Unlock() + + var merr MultiError for _, pb := range db.persisted { merr.Add(pb.Close()) @@ -403,9 +423,14 @@ func (db *DB) Appender() Appender { db.mtx.RLock() a := &dbAppender{db: db} + db.headmtx.RLock() + for _, b := range db.appendable() { a.heads = append(a.heads, b.Appender().(*headAppender)) } + + db.headmtx.RUnlock() + return a } @@ -468,15 +493,12 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { // If there's no fitting head block for t, ensure it gets created. if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { - a.db.mtx.RUnlock() + a.db.headmtx.Lock() if err := a.db.ensureHead(t); err != nil { - a.db.mtx.RLock() + a.db.headmtx.Unlock() return nil, err } - - a.db.mtx.RLock() - if len(a.heads) == 0 { for _, b := range a.db.appendable() { a.heads = append(a.heads, b.Appender().(*headAppender)) @@ -489,6 +511,8 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { } } } + + a.db.headmtx.Unlock() } for i := len(a.heads) - 1; i >= 0; i-- { if h := a.heads[i]; t >= h.meta.MinTime { @@ -500,8 +524,8 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { } func (db *DB) ensureHead(t int64) error { - db.mtx.Lock() - defer db.mtx.Unlock() + // db.mtx.Lock() + // defer db.mtx.Unlock() // Initial case for a new database: we must create the first // AppendableBlocks-1 front padding heads. @@ -557,31 +581,6 @@ func (db *DB) appendable() []*headBlock { return db.heads[len(db.heads)-db.opts.AppendableBlocks:] } -func (db *DB) compactable() []Block { - db.mtx.RLock() - defer db.mtx.RUnlock() - - var blocks []Block - for _, pb := range db.persisted { - blocks = append(blocks, pb) - } - - if len(db.heads) <= db.opts.AppendableBlocks { - return blocks - } - - for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { - // Blocks that won't be appendable when instantiating a new appender - // might still have active appenders on them. - // Abort at the first one we encounter. - if atomic.LoadUint64(&h.activeWriters) > 0 { - break - } - blocks = append(blocks, h) - } - return blocks -} - func intervalOverlap(amin, amax, bmin, bmax int64) bool { if bmin >= amin && bmin <= amax { return true diff --git a/head.go b/head.go index 969bf3416..058c8fe79 100644 --- a/head.go +++ b/head.go @@ -145,6 +145,10 @@ func (h *headBlock) inBounds(t int64) bool { // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { + // Lock mutex and leave it locked so we panic if there's a bug causing + // the block to be used afterwards. + h.mtx.Lock() + if err := h.wal.Close(); err != nil { return err } diff --git a/querier.go b/querier.go index 5f02c77f7..7783ef312 100644 --- a/querier.go +++ b/querier.go @@ -47,7 +47,9 @@ type querier struct { func (s *DB) Querier(mint, maxt int64) Querier { s.mtx.RLock() + s.headmtx.RLock() blocks := s.blocksForInterval(mint, maxt) + s.headmtx.RUnlock() sq := &querier{ blocks: make([]Querier, 0, len(blocks)), diff --git a/wal.go b/wal.go index 5962711f9..8b88d110d 100644 --- a/wal.go +++ b/wal.go @@ -265,8 +265,9 @@ func (w *WAL) Close() error { close(w.stopc) <-w.donec + // Lock mutex and leave it locked so we panic if there's a bug causing + // the block to be used afterwards. w.mtx.Lock() - defer w.mtx.Unlock() if err := w.sync(); err != nil { return err