Create series with ID recorded in WAL when reading it back

This commit is contained in:
Fabian Reinartz 2017-09-19 10:20:19 +02:00
parent 7ada9cd805
commit 162a48e4f2
4 changed files with 104 additions and 9 deletions

View File

@ -26,6 +26,7 @@ import (
"time" "time"
"unsafe" "unsafe"
"github.com/go-kit/kit/log"
"github.com/pkg/errors" "github.com/pkg/errors"
promlabels "github.com/prometheus/prometheus/pkg/labels" promlabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/textparse"
@ -88,7 +89,10 @@ func (b *writeBenchmark) run() {
dir := filepath.Join(b.outPath, "storage") dir := filepath.Join(b.outPath, "storage")
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
st, err := tsdb.Open(dir, l, nil, &tsdb.Options{
WALFlushInterval: 200 * time.Millisecond, WALFlushInterval: 200 * time.Millisecond,
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3), BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3),

18
head.go
View File

@ -192,7 +192,11 @@ func (h *Head) ReadWAL() error {
seriesFunc := func(series []RefSeries) error { seriesFunc := func(series []RefSeries) error {
for _, s := range series { for _, s := range series {
h.getOrCreate(s.Labels.Hash(), s.Labels) h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
if h.lastSeriesID < s.Ref {
h.lastSeriesID = s.Ref
}
} }
return nil return nil
} }
@ -203,7 +207,8 @@ func (h *Head) ReadWAL() error {
} }
ms := h.series.getByID(s.Ref) ms := h.series.getByID(s.Ref)
if ms == nil { if ms == nil {
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref)
continue
} }
_, chunkCreated := ms.append(s.T, s.V) _, chunkCreated := ms.append(s.T, s.V)
if chunkCreated { if chunkCreated {
@ -211,7 +216,6 @@ func (h *Head) ReadWAL() error {
h.metrics.chunks.Inc() h.metrics.chunks.Inc()
} }
} }
return nil return nil
} }
deletesFunc := func(stones []Stone) error { deletesFunc := func(stones []Stone) error {
@ -223,7 +227,6 @@ func (h *Head) ReadWAL() error {
h.tombstones.add(s.ref, itv) h.tombstones.add(s.ref, itv)
} }
} }
return nil return nil
} }
@ -846,7 +849,12 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
// Optimistically assume that we are the first one to create the series. // Optimistically assume that we are the first one to create the series.
id := atomic.AddUint64(&h.lastSeriesID, 1) id := atomic.AddUint64(&h.lastSeriesID, 1)
s = newMemSeries(lset, id, h.chunkRange)
return h.getOrCreateWithID(id, hash, lset)
}
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) {
s := newMemSeries(lset, id, h.chunkRange)
s, created := h.series.getOrSet(hash, s) s, created := h.series.getOrSet(hash, s)
if !created { if !created {

View File

@ -21,6 +21,7 @@ import (
"unsafe" "unsafe"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
promlabels "github.com/prometheus/prometheus/pkg/labels" promlabels "github.com/prometheus/prometheus/pkg/labels"
@ -83,6 +84,82 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) {
return mets, nil return mets, nil
} }
type memoryWAL struct {
nopWAL
entries []interface{}
}
func (w *memoryWAL) Reader() WALReader {
return w
}
func (w *memoryWAL) Read(series SeriesCB, samples SamplesCB, deletes DeletesCB) error {
for _, e := range w.entries {
switch v := e.(type) {
case []RefSeries:
series(v)
case []RefSample:
samples(v)
case []Stone:
deletes(v)
}
}
return nil
}
func TestHead_ReadWAL(t *testing.T) {
entries := []interface{}{
[]RefSeries{
{Ref: 10, Labels: labels.FromStrings("a", "1")},
{Ref: 11, Labels: labels.FromStrings("a", "2")},
{Ref: 100, Labels: labels.FromStrings("a", "3")},
},
[]RefSample{
{Ref: 0, T: 99, V: 1},
{Ref: 10, T: 100, V: 2},
{Ref: 100, T: 100, V: 3},
},
[]RefSeries{
{Ref: 50, Labels: labels.FromStrings("a", "4")},
},
[]RefSample{
{Ref: 10, T: 101, V: 5},
{Ref: 50, T: 101, V: 6},
},
}
wal := &memoryWAL{entries: entries}
head, err := NewHead(nil, nil, wal, 1000)
require.NoError(t, err)
require.NoError(t, head.ReadWAL())
require.Equal(t, uint64(100), head.lastSeriesID)
s10 := head.series.getByID(10)
s11 := head.series.getByID(11)
s50 := head.series.getByID(50)
s100 := head.series.getByID(100)
require.Equal(t, labels.FromStrings("a", "1"), s10.lset)
require.Equal(t, labels.FromStrings("a", "2"), s11.lset)
require.Equal(t, labels.FromStrings("a", "4"), s50.lset)
require.Equal(t, labels.FromStrings("a", "3"), s100.lset)
expandChunk := func(c chunks.Iterator) (x []sample) {
for c.Next() {
t, v := c.At()
x = append(x, sample{t: t, v: v})
}
require.NoError(t, c.Err())
return x
}
require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
require.Equal(t, 0, len(s11.chunks))
require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
require.Equal(t, []sample{{100, 3}}, expandChunk(s100.iterator(0)))
}
func TestHead_Truncate(t *testing.T) { func TestHead_Truncate(t *testing.T) {
h, err := NewHead(nil, nil, nil, 1000) h, err := NewHead(nil, nil, nil, 1000)
require.NoError(t, err) require.NoError(t, err)

12
wal.go
View File

@ -824,7 +824,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
if err != nil { if err != nil {
return errors.Wrap(err, "decode series entry") return errors.Wrap(err, "decode series entry")
} }
seriesf(series) if err := seriesf(series); err != nil {
return err
}
cf := r.current() cf := r.current()
@ -839,7 +841,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
if err != nil { if err != nil {
return errors.Wrap(err, "decode samples entry") return errors.Wrap(err, "decode samples entry")
} }
samplesf(samples) if err := samplesf(samples); err != nil {
return err
}
// Update the times for the WAL segment file. // Update the times for the WAL segment file.
cf := r.current() cf := r.current()
@ -855,7 +859,9 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
if err != nil { if err != nil {
return errors.Wrap(err, "decode delete entry") return errors.Wrap(err, "decode delete entry")
} }
deletesf(stones) if err := deletesf(stones); err != nil {
return err
}
// Update the times for the WAL segment file. // Update the times for the WAL segment file.
cf := r.current() cf := r.current()