Add separate head mutex

Introduce a seperate mutex for the head blocks to avoid a race where
a post-compaction reload may run between switching the DB's base mutex
to create a new head block in an appender.
This commit is contained in:
Fabian Reinartz 2017-03-04 16:50:48 +01:00
parent 3e569bc964
commit 55a9b5428a
4 changed files with 73 additions and 67 deletions

131
db.go
View File

@ -87,17 +87,24 @@ const sep = '\xff'
// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
dir string
lockf lockfile.Lockfile
dir string
lockf lockfile.Lockfile
logger log.Logger
metrics *dbMetrics
opts *Options
// Mutex for that must be held when modifying the general
// block layout.
mtx sync.RWMutex
persisted []*persistedBlock
heads []*headBlock
seqBlocks map[int]Block
headGen uint8
// Mutex that must be held when modifying just the head blocks
// or the general layout.
headmtx sync.RWMutex
heads []*headBlock
headGen uint8
compactor Compactor
@ -200,7 +207,15 @@ func (db *DB) run() {
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
if err := db.compact(); err != nil {
var merr MultiError
changes, err := db.compact()
merr.Add(err)
if changes {
merr.Add(db.reloadBlocks())
}
if err := merr.Err(); err != nil {
db.logger.Log("msg", "compaction failed", "err", err)
}
@ -210,12 +225,17 @@ func (db *DB) run() {
}
}
func (db *DB) compact() error {
changes := false
func (db *DB) compact() (changes bool, err error) {
// Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority.
db.mtx.RLock()
db.headmtx.RLock()
var singles []*headBlock
// Collect head blocks that are ready for compaction. Write them after
// returning the lock to not block Appenders.
// Selected blocks are semantically ensured to not be written to afterwards
// by appendable().
if len(db.heads) > db.opts.AppendableBlocks {
for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] {
// Blocks that won't be appendable when instantiating a new appender
@ -224,36 +244,38 @@ func (db *DB) compact() error {
if atomic.LoadUint64(&h.activeWriters) > 0 {
break
}
db.logger.Log("msg", "write head", "seq", h.Meta().Sequence)
select {
case <-db.stopc:
db.mtx.RUnlock()
return nil
default:
}
if err := db.compactor.Write(h.Dir(), h); err != nil {
db.mtx.RUnlock()
return errors.Wrap(err, "persist head block")
}
changes = true
singles = append(singles, h)
}
}
db.mtx.RUnlock()
db.headmtx.RUnlock()
Loop:
for _, h := range singles {
db.logger.Log("msg", "write head", "seq", h.Meta().Sequence)
select {
case <-db.stopc:
break Loop
default:
}
if err = db.compactor.Write(h.Dir(), h); err != nil {
return changes, errors.Wrap(err, "persist head block")
}
changes = true
}
// Check for compactions of multiple blocks.
for {
plans, err := db.compactor.Plan(db.dir)
if err != nil {
return errors.Wrap(err, "plan compaction")
return changes, errors.Wrap(err, "plan compaction")
}
select {
case <-db.stopc:
return nil
return false, nil
default:
}
// We just execute compactions sequentially to not cause too extreme
@ -264,7 +286,7 @@ func (db *DB) compact() error {
db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p))
if err := db.compactor.Compact(p...); err != nil {
return errors.Wrapf(err, "compact %s", p)
return changes, errors.Wrapf(err, "compact %s", p)
}
changes = true
}
@ -274,10 +296,7 @@ func (db *DB) compact() error {
}
}
if changes {
return errors.Wrap(db.reloadBlocks(), "reload blocks")
}
return nil
return changes, nil
}
// func (db *DB) retentionCutoff() error {
@ -381,10 +400,11 @@ func (db *DB) Close() error {
close(db.stopc)
<-db.donec
var merr MultiError
// Lock mutex and leave it locked so we panic if there's a bug causing
// the block to be used afterwards.
db.mtx.Lock()
defer db.mtx.Unlock()
var merr MultiError
for _, pb := range db.persisted {
merr.Add(pb.Close())
@ -403,9 +423,14 @@ func (db *DB) Appender() Appender {
db.mtx.RLock()
a := &dbAppender{db: db}
db.headmtx.RLock()
for _, b := range db.appendable() {
a.heads = append(a.heads, b.Appender().(*headAppender))
}
db.headmtx.RUnlock()
return a
}
@ -468,15 +493,12 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
// If there's no fitting head block for t, ensure it gets created.
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
a.db.mtx.RUnlock()
a.db.headmtx.Lock()
if err := a.db.ensureHead(t); err != nil {
a.db.mtx.RLock()
a.db.headmtx.Unlock()
return nil, err
}
a.db.mtx.RLock()
if len(a.heads) == 0 {
for _, b := range a.db.appendable() {
a.heads = append(a.heads, b.Appender().(*headAppender))
@ -489,6 +511,8 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
}
}
}
a.db.headmtx.Unlock()
}
for i := len(a.heads) - 1; i >= 0; i-- {
if h := a.heads[i]; t >= h.meta.MinTime {
@ -500,8 +524,8 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
}
func (db *DB) ensureHead(t int64) error {
db.mtx.Lock()
defer db.mtx.Unlock()
// db.mtx.Lock()
// defer db.mtx.Unlock()
// Initial case for a new database: we must create the first
// AppendableBlocks-1 front padding heads.
@ -557,31 +581,6 @@ func (db *DB) appendable() []*headBlock {
return db.heads[len(db.heads)-db.opts.AppendableBlocks:]
}
func (db *DB) compactable() []Block {
db.mtx.RLock()
defer db.mtx.RUnlock()
var blocks []Block
for _, pb := range db.persisted {
blocks = append(blocks, pb)
}
if len(db.heads) <= db.opts.AppendableBlocks {
return blocks
}
for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] {
// Blocks that won't be appendable when instantiating a new appender
// might still have active appenders on them.
// Abort at the first one we encounter.
if atomic.LoadUint64(&h.activeWriters) > 0 {
break
}
blocks = append(blocks, h)
}
return blocks
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
if bmin >= amin && bmin <= amax {
return true

View File

@ -145,6 +145,10 @@ func (h *headBlock) inBounds(t int64) bool {
// Close syncs all data and closes underlying resources of the head block.
func (h *headBlock) Close() error {
// Lock mutex and leave it locked so we panic if there's a bug causing
// the block to be used afterwards.
h.mtx.Lock()
if err := h.wal.Close(); err != nil {
return err
}

View File

@ -47,7 +47,9 @@ type querier struct {
func (s *DB) Querier(mint, maxt int64) Querier {
s.mtx.RLock()
s.headmtx.RLock()
blocks := s.blocksForInterval(mint, maxt)
s.headmtx.RUnlock()
sq := &querier{
blocks: make([]Querier, 0, len(blocks)),

3
wal.go
View File

@ -265,8 +265,9 @@ func (w *WAL) Close() error {
close(w.stopc)
<-w.donec
// Lock mutex and leave it locked so we panic if there's a bug causing
// the block to be used afterwards.
w.mtx.Lock()
defer w.mtx.Unlock()
if err := w.sync(); err != nil {
return err