Make WAL for HeadBlock composeable.

This commit is contained in:
Fabian Reinartz 2017-05-13 18:14:18 +02:00
parent 4862b261d0
commit 8b51b7e2be
3 changed files with 72 additions and 50 deletions

25
db.go
View File

@ -418,7 +418,7 @@ func (db *DB) reloadBlocks() error {
if meta.Compaction.Generation == 0 {
if !ok {
b, err = OpenHeadBlock(dirs[i], db.logger)
b, err = db.openHeadBlock(dirs[i])
if err != nil {
return errors.Wrapf(err, "load head at %s", dirs[i])
}
@ -709,6 +709,24 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
return bs
}
// openHeadBlock opens the head block at dir.
func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
var (
wdir = filepath.Join(dir, "wal")
l = log.With(db.logger, "wal", wdir)
)
wal, err := OpenSegmentWAL(wdir, l, 5*time.Second)
if err != nil {
return nil, errors.Wrap(err, "open WAL %s")
}
h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal)
if err != nil {
return nil, errors.Wrapf(err, "open head block %s", dir)
}
return h, nil
}
// cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period.
func (db *DB) cut(mint int64) (headBlock, error) {
@ -718,7 +736,10 @@ func (db *DB) cut(mint int64) (headBlock, error) {
if err != nil {
return nil, err
}
newHead, err := CreateHeadBlock(dir, seq, db.logger, mint, maxt)
if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil {
return nil, errors.Wrapf(err, "touch head block %s", dir)
}
newHead, err := db.openHeadBlock(dir)
if err != nil {
return nil, err
}

37
head.go
View File

@ -69,20 +69,21 @@ type HeadBlock struct {
meta BlockMeta
}
// CreateHeadBlock creates a new head block in dir that holds samples in the range [mint,maxt).
func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*HeadBlock, error) {
// TouchHeadBlock atomically touches a new head block in dir for
// samples in the range [mint,maxt).
func TouchHeadBlock(dir string, seq int, mint, maxt int64) error {
// Make head block creation appear atomic.
tmp := dir + ".tmp"
if err := os.MkdirAll(tmp, 0777); err != nil {
return nil, err
return err
}
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
ulid, err := ulid.New(ulid.Now(), entropy)
if err != nil {
return nil, err
return err
}
if err := writeMetaFile(tmp, &BlockMeta{
@ -91,20 +92,13 @@ func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*Head
MinTime: mint,
MaxTime: maxt,
}); err != nil {
return nil, err
return err
}
if err := renameFile(tmp, dir); err != nil {
return nil, err
}
return OpenHeadBlock(dir, l)
return renameFile(tmp, dir)
}
// OpenHeadBlock opens the head block in dir.
func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
wal, err := OpenSegmentWAL(dir, log.With(l, "component", "wal"), 5*time.Second)
if err != nil {
return nil, err
}
func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) {
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
@ -119,8 +113,11 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
postings: &memPostings{m: make(map[term][]uint32)},
meta: *meta,
}
return h, h.init()
}
r := wal.Reader()
func (h *HeadBlock) init() error {
r := h.wal.Reader()
for r.Next() {
series, samples := r.At()
@ -131,21 +128,17 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
}
for _, s := range samples {
if int(s.Ref) >= len(h.series) {
return nil, errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series))
return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series))
}
h.series[s.Ref].append(s.T, s.V)
if !h.inBounds(s.T) {
return nil, errors.Wrap(ErrOutOfBounds, "consume WAL")
return errors.Wrap(ErrOutOfBounds, "consume WAL")
}
h.meta.Stats.NumSamples++
}
}
if err := r.Err(); err != nil {
return nil, errors.Wrap(err, "consume WAL")
}
return h, nil
return errors.Wrap(r.Err(), "consume WAL")
}
// inBounds returns true if the given timestamp is within the valid

View File

@ -20,6 +20,7 @@ import (
"os"
"sort"
"testing"
"time"
"unsafe"
"github.com/pkg/errors"
@ -30,6 +31,19 @@ import (
"github.com/stretchr/testify/require"
)
// createTestHeadBlock creates a new head block with a SegmentWAL.
func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock {
err := TouchHeadBlock(dir, 0, mint, maxt)
require.NoError(t, err)
wal, err := OpenSegmentWAL(dir, nil, 5*time.Second)
require.NoError(t, err)
h, err := OpenHeadBlock(dir, nil, wal)
require.NoError(t, err)
return h
}
func BenchmarkCreateSeries(b *testing.B) {
lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6)
require.NoError(b, err)
@ -39,8 +53,7 @@ func BenchmarkCreateSeries(b *testing.B) {
require.NoError(b, err)
defer os.RemoveAll(dir)
h, err := CreateHeadBlock(dir, 0, nil, 0, 1)
require.NoError(b, err)
h := createTestHeadBlock(b, dir, 0, 1)
b.ReportAllocs()
b.ResetTimer()
@ -90,14 +103,13 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) {
}
func TestAmendDatapointCausesError(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
dir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(dir)
hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
require.NoError(t, err, "Error creating head block")
hb := createTestHeadBlock(t, dir, 0, 1000)
app := hb.Appender()
_, err = app.Add(labels.Labels{}, 0, 0)
_, err := app.Add(labels.Labels{}, 0, 0)
require.NoError(t, err, "Failed to add sample")
require.NoError(t, app.Commit(), "Unexpected error committing appender")
@ -107,14 +119,13 @@ func TestAmendDatapointCausesError(t *testing.T) {
}
func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
dir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(dir)
hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
require.NoError(t, err, "Error creating head block")
hb := createTestHeadBlock(t, dir, 0, 1000)
app := hb.Appender()
_, err = app.Add(labels.Labels{}, 0, math.NaN())
_, err := app.Add(labels.Labels{}, 0, math.NaN())
require.NoError(t, err, "Failed to add sample")
require.NoError(t, app.Commit(), "Unexpected error committing appender")
@ -124,14 +135,13 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
}
func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
dir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(dir)
hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
require.NoError(t, err, "Error creating head block")
hb := createTestHeadBlock(t, dir, 0, 1000)
app := hb.Appender()
_, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
_, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
require.NoError(t, err, "Failed to add sample")
require.NoError(t, app.Commit(), "Unexpected error committing appender")
@ -141,15 +151,14 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
}
func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
dir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(dir)
hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000)
require.NoError(t, err)
hb := createTestHeadBlock(t, dir, 0, 1000)
// Append AmendedValue.
app := hb.Appender()
_, err = app.Add(labels.Labels{{"a", "b"}}, 0, 1)
_, err := app.Add(labels.Labels{{"a", "b"}}, 0, 1)
require.NoError(t, err)
_, err = app.Add(labels.Labels{{"a", "b"}}, 0, 2)
require.NoError(t, err)
@ -243,11 +252,10 @@ func TestHeadBlock_e2e(t *testing.T) {
seriesMap[labels.New(l...).String()] = []sample{}
}
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
dir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(dir)
hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime)
require.NoError(t, err)
hb := createTestHeadBlock(t, dir, minTime, maxTime)
app := hb.Appender()
for _, l := range lbls {