Count writer references on head blocks

This commit is contained in:
Fabian Reinartz 2017-02-04 11:53:52 +01:00
parent 5a1c8eaa0e
commit 012cf4ef25
3 changed files with 17 additions and 3 deletions

View File

@ -122,7 +122,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
dur := measureTime("ingestScrapes", func() { dur := measureTime("ingestScrapes", func() {
b.startProfiling() b.startProfiling()
total, err = b.ingestScrapes(metrics, 4000) total, err = b.ingestScrapes(metrics, 3000)
if err != nil { if err != nil {
exitWithError(err) exitWithError(err)
} }

9
db.go
View File

@ -12,6 +12,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"unsafe" "unsafe"
@ -29,7 +30,7 @@ import (
var DefaultOptions = &Options{ var DefaultOptions = &Options{
WALFlushInterval: 5 * time.Second, WALFlushInterval: 5 * time.Second,
MinBlockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds MinBlockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds
MaxBlockDuration: 48 * 60 * 60 * 1000, // 1 day in milliseconds MaxBlockDuration: 48 * 60 * 60 * 1000, // 2 days in milliseconds
AppendableBlocks: 2, AppendableBlocks: 2,
} }
@ -503,6 +504,12 @@ func (db *DB) compactable() []Block {
} }
for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] {
// Blocks that won't be appendable when instantiating a new appender
// might still have active appenders on them.
// Abort at the first one we encounter.
if atomic.LoadUint64(&h.activeWriters) > 0 {
break
}
blocks = append(blocks, h) blocks = append(blocks, h)
} }
return blocks return blocks

View File

@ -7,6 +7,7 @@ import (
"os" "os"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/bradfitz/slice" "github.com/bradfitz/slice"
@ -40,6 +41,8 @@ type headBlock struct {
generation uint8 generation uint8
wal *WAL wal *WAL
activeWriters uint64
// descs holds all chunk descs for the head block. Each chunk implicitly // descs holds all chunk descs for the head block. Each chunk implicitly
// is assigned the index as its ID. // is assigned the index as its ID.
series []*memSeries series []*memSeries
@ -147,6 +150,8 @@ func (h *headBlock) Index() IndexReader { return &headIndexReader{h} }
func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} } func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} }
func (h *headBlock) Appender() Appender { func (h *headBlock) Appender() Appender {
atomic.AddUint64(&h.activeWriters, 1)
h.mtx.RLock() h.mtx.RLock()
return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} return &headAppender{headBlock: h, samples: getHeadAppendBuffer()}
} }
@ -295,6 +300,7 @@ func (a *headAppender) createSeries() {
} }
func (a *headAppender) Commit() error { func (a *headAppender) Commit() error {
defer atomic.AddUint64(&a.activeWriters, ^uint64(0))
defer putHeadAppendBuffer(a.samples) defer putHeadAppendBuffer(a.samples)
a.createSeries() a.createSeries()
@ -345,8 +351,9 @@ func (a *headAppender) Commit() error {
} }
func (a *headAppender) Rollback() error { func (a *headAppender) Rollback() error {
putHeadAppendBuffer(a.samples)
a.mtx.RUnlock() a.mtx.RUnlock()
atomic.AddUint64(&a.activeWriters, ^uint64(0))
putHeadAppendBuffer(a.samples)
return nil return nil
} }