Merge pull request #109 from prometheus/fixes

Fix compactions
This commit is contained in:
Fabian Reinartz 2017-07-14 13:45:46 +02:00 committed by GitHub
commit 8d3a398113
7 changed files with 159 additions and 100 deletions

View File

@ -3,7 +3,7 @@ sudo: false
language: go
go:
- 1.8
- 1.8.x
go_import_path: github.com/prometheus/tsdb
@ -14,6 +14,6 @@ install:
- go get -t ./...
script:
- go test ./...
- go test -timeout 5m ./...

View File

@ -112,8 +112,8 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
WALFlushInterval: 200 * time.Millisecond,
RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
BlockRanges: tsdb.ExponentialBlockRanges(int64(2*time.Hour), 3, 5),
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
BlockRanges: tsdb.ExponentialBlockRanges(3*60*60*1000, 3, 5),
})
if err != nil {
exitWithError(err)
@ -188,6 +188,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u
}
wg.Wait()
}
fmt.Println("ingestion completed")
return total, nil
}

View File

@ -229,11 +229,24 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
for i := 0; i < len(ds); {
var group []dirMeta
var t0 int64
m := ds[i].meta
// Compute start of aligned time range of size tr closest to the current block's start.
t0 := ds[i].meta.MinTime - (ds[i].meta.MinTime % tr)
if m.MinTime >= 0 {
t0 = tr * (m.MinTime / tr)
} else {
t0 = tr * ((m.MinTime - tr + 1) / tr)
}
// Skip blocks that don't fall into the range. This can happen via mis-alignment or
// by being the multiple of the intended range.
if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr {
i++
continue
}
// Add all dirs to the current group that are within [t0, t0+tr].
for ; i < len(ds); i++ {
// Either the block falls into the next range or doesn't fit at all (checked above).
if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr {
break
}

View File

@ -14,7 +14,6 @@
package tsdb
import (
"sort"
"testing"
"github.com/stretchr/testify/require"
@ -178,58 +177,84 @@ func TestCompactionSelect(t *testing.T) {
}
func TestSplitByRange(t *testing.T) {
splitterFunc := func(ds []dirMeta, tr int64) [][]dirMeta {
rMap := make(map[int64][]dirMeta)
for _, dir := range ds {
t0 := dir.meta.MinTime - dir.meta.MinTime%tr
if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) {
rMap[t0] = append(rMap[t0], dir)
}
}
res := make([][]dirMeta, 0, len(rMap))
for _, v := range rMap {
res = append(res, v)
}
sort.Slice(res, func(i, j int) bool {
return res[i][0].meta.MinTime < res[j][0].meta.MinTime
})
return res
}
cases := []struct {
trange int64
ranges [][]int64
output [][][]int64
ranges [][2]int64
output [][][2]int64
}{
{
trange: 60,
ranges: [][]int64{{0, 10}},
ranges: [][2]int64{{0, 10}},
output: [][][2]int64{
{{0, 10}},
},
},
{
trange: 60,
ranges: [][]int64{{0, 60}},
ranges: [][2]int64{{0, 60}},
output: [][][2]int64{
{{0, 60}},
},
},
{
trange: 60,
ranges: [][]int64{{0, 10}, {30, 60}},
ranges: [][2]int64{{0, 10}, {9, 15}, {30, 60}},
output: [][][2]int64{
{{0, 10}, {9, 15}, {30, 60}},
},
},
{
trange: 60,
ranges: [][]int64{{0, 10}, {60, 90}},
ranges: [][2]int64{{70, 90}, {125, 130}, {130, 180}, {1000, 1001}},
output: [][][2]int64{
{{70, 90}},
{{125, 130}, {130, 180}},
{{1000, 1001}},
},
},
// Mis-aligned or too-large blocks are ignored.
{
trange: 60,
ranges: [][2]int64{{50, 70}, {70, 80}},
output: [][][2]int64{
{{70, 80}},
},
},
{
trange: 72,
ranges: [][2]int64{{0, 144}, {144, 216}, {216, 288}},
output: [][][2]int64{
{{144, 216}},
{{216, 288}},
},
},
// Various awkward edge cases easy to hit with negative numbers.
{
trange: 60,
ranges: [][2]int64{{-10, -5}},
output: [][][2]int64{
{{-10, -5}},
},
},
{
trange: 60,
ranges: [][]int64{{0, 10}, {20, 30}, {90, 120}},
ranges: [][2]int64{{-60, -50}, {-10, -5}},
output: [][][2]int64{
{{-60, -50}, {-10, -5}},
},
},
{
trange: 60,
ranges: [][]int64{{0, 10}, {59, 60}, {60, 120}, {120, 180}, {190, 200}, {200, 210}, {220, 239}},
ranges: [][2]int64{{-60, -50}, {-10, -5}, {0, 15}},
output: [][][2]int64{
{{-60, -50}, {-10, -5}},
{{0, 15}},
},
},
}
for _, c := range cases {
// Transform input range tuples into dirMetas.
blocks := make([]dirMeta, 0, len(c.ranges))
for _, r := range c.ranges {
blocks = append(blocks, dirMeta{
@ -240,6 +265,16 @@ func TestSplitByRange(t *testing.T) {
})
}
require.Equal(t, splitterFunc(blocks, c.trange), splitByRange(blocks, c.trange))
// Transform output range tuples into dirMetas.
exp := make([][]dirMeta, len(c.output))
for i, group := range c.output {
for _, r := range group {
exp[i] = append(exp[i], dirMeta{
meta: &BlockMeta{MinTime: r[0], MaxTime: r[1]},
})
}
}
require.Equal(t, exp, splitByRange(blocks, c.trange))
}
}

114
db.go
View File

@ -99,14 +99,13 @@ type DB struct {
metrics *dbMetrics
opts *Options
// Mutex for that must be held when modifying the general
// block layout.
// Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex
blocks []Block
// Mutex that must be held when modifying just the head blocks
// or the general layout.
// Must never be held when acquiring a blocks's mutex!
// mtx must be held before acquiring.
headmtx sync.RWMutex
heads []headBlock
@ -118,7 +117,7 @@ type DB struct {
// cmtx is used to control compactions and deletions.
cmtx sync.Mutex
compacting bool
compactionsEnabled bool
}
type dbMetrics struct {
@ -203,7 +202,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
compacting: true,
compactionsEnabled: true,
}
db.metrics = newDBMetrics(db, r)
@ -325,37 +324,59 @@ func headFullness(h headBlock) float64 {
return a / b
}
// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to.
func (db *DB) appendableHeads() (r []headBlock) {
switch l := len(db.heads); l {
case 0:
case 1:
r = append(r, db.heads[0])
default:
if headFullness(db.heads[l-1]) < 0.5 {
r = append(r, db.heads[l-2])
}
r = append(r, db.heads[l-1])
}
return r
}
func (db *DB) completedHeads() (r []headBlock) {
db.mtx.RLock()
defer db.mtx.RUnlock()
db.headmtx.RLock()
defer db.headmtx.RUnlock()
if len(db.heads) < 2 {
return nil
}
// Select all old heads unless they still have pending appenders.
for _, h := range db.heads[:len(db.heads)-2] {
if h.ActiveWriters() > 0 {
return r
}
r = append(r, h)
}
// Add the 2nd last head if the last head is more than 50% filled.
// Compacting it early allows us to free its memory before allocating
// more for the next block and thus reduces spikes.
if h2 := db.heads[len(db.heads)-2]; headFullness(h2) >= 0.5 && h2.ActiveWriters() == 0 {
r = append(r, h2)
}
return r
}
func (db *DB) compact() (changes bool, err error) {
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.headmtx.RLock()
if !db.compactionsEnabled {
return false, nil
}
// Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority.
var singles []Block
// Collect head blocks that are ready for compaction. Write them after
// returning the lock to not block Appenders.
// Selected blocks are semantically ensured to not be written to afterwards
// by appendable().
if len(db.heads) > 1 {
f := headFullness(db.heads[len(db.heads)-1])
for _, h := range db.heads[:len(db.heads)-1] {
// Blocks that won't be appendable when instantiating a new appender
// might still have active appenders on them.
// Abort at the first one we encounter.
if h.ActiveWriters() > 0 || f < 0.5 {
break
}
singles = append(singles, h)
}
}
db.headmtx.RUnlock()
for _, h := range singles {
for _, h := range db.completedHeads() {
select {
case <-db.stopc:
return changes, nil
@ -561,30 +582,30 @@ func (db *DB) Close() error {
// DisableCompactions disables compactions.
func (db *DB) DisableCompactions() {
if db.compacting {
db.cmtx.Lock()
db.compacting = false
defer db.cmtx.Unlock()
db.compactionsEnabled = false
db.logger.Log("msg", "compactions disabled")
}
}
// EnableCompactions enables compactions.
func (db *DB) EnableCompactions() {
if !db.compacting {
db.cmtx.Unlock()
db.compacting = true
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.compactionsEnabled = true
db.logger.Log("msg", "compactions enabled")
}
}
// Snapshot writes the current data to the directory.
func (db *DB) Snapshot(dir string) error {
db.mtx.Lock() // To block any appenders.
defer db.mtx.Unlock()
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.mtx.Lock() // To block any appenders.
defer db.mtx.Unlock()
blocks := db.blocks[:]
for _, b := range blocks {
db.logger.Log("msg", "snapshotting block", "block", b)
@ -677,7 +698,7 @@ func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) {
}
var hb headBlock
for _, h := range a.db.appendable() {
for _, h := range a.db.appendableHeads() {
m := h.Meta()
if intervalContains(m.MinTime, m.MaxTime-1, t) {
@ -789,6 +810,7 @@ func (a *dbAppender) Rollback() error {
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.mtx.Lock()
defer db.mtx.Unlock()
@ -809,18 +831,6 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
return nil
}
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
func (db *DB) appendable() (r []headBlock) {
switch len(db.heads) {
case 0:
case 1:
r = append(r, db.heads[0])
default:
r = append(r, db.heads[len(db.heads)-2:]...)
}
return r
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/
return amin <= bmax && bmin <= amax

View File

@ -60,10 +60,10 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
require.NoError(t, err)
querier := db.Querier(0, 1)
defer querier.Close()
seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar")))
require.NoError(t, err)
require.Equal(t, seriesSet, map[string][]sample{})
require.NoError(t, querier.Close())
err = app.Commit()
require.NoError(t, err)

View File

@ -53,8 +53,8 @@ type querier struct {
blocks []Querier
}
// Querier returns a new querier over the data partition for the given
// time range.
// Querier returns a new querier over the data partition for the given time range.
// A goroutine must not handle more than one open Querier.
func (s *DB) Querier(mint, maxt int64) Querier {
s.mtx.RLock()