diff --git a/checkpoint.go b/checkpoint.go new file mode 100644 index 0000000000..d988d35615 --- /dev/null +++ b/checkpoint.go @@ -0,0 +1,279 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/wal" +) + +// CheckpointStats returns stats about a created checkpoint. +type CheckpointStats struct { + DroppedSeries int + DroppedSamples int + DroppedTombstones int + TotalSeries int // Processed series including dropped ones. + TotalSamples int // Processed samples inlcuding dropped ones. + TotalTombstones int // Processed tombstones including dropped ones. +} + +// LastCheckpoint returns the directory name of the most recent checkpoint. +// If dir does not contain any checkpoints, ErrNotFound is returned. +func LastCheckpoint(dir string) (string, int, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return "", 0, err + } + // Traverse list backwards since there may be multiple checkpoints left. + for i := len(files) - 1; i >= 0; i-- { + fi := files[i] + + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + continue + } + if !fi.IsDir() { + return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name()) + } + k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil { + continue + } + return fi.Name(), k, nil + } + return "", 0, ErrNotFound +} + +// DeleteCheckpoints deletes all checkpoints in dir that have an index +// below n. +func DeleteCheckpoints(dir string, n int) error { + var errs MultiError + + files, err := ioutil.ReadDir(dir) + if err != nil { + return err + } + for _, fi := range files { + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + continue + } + k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil || k >= n { + continue + } + if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { + errs.Add(err) + } + } + return errs.Err() +} + +const checkpointPrefix = "checkpoint." + +// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL. +// It includes the most recent checkpoint if it exists. +// All series not satisfying keep and samples below mint are dropped. +// +// The checkpoint is stored in a directory named checkpoint.N in the same +// segmented format as the original WAL itself. +// This makes it easy to read it through the WAL package and concatenate +// it with the original WAL. +// +// Non-critical errors are logged and not returned. +func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { + if logger == nil { + logger = log.NewNopLogger() + } + stats := &CheckpointStats{} + + var sr io.Reader + { + lastFn, k, err := LastCheckpoint(w.Dir()) + if err != nil && err != ErrNotFound { + return nil, errors.Wrap(err, "find last checkpoint") + } + if err == nil { + if m > k+1 { + return nil, errors.New("unexpected gap to last checkpoint") + } + // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. + m = k + 1 + + last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn)) + if err != nil { + return nil, errors.Wrap(err, "open last checkpoint") + } + defer last.Close() + sr = last + } + + segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) + if err != nil { + return nil, errors.Wrap(err, "create segment reader") + } + defer segsr.Close() + + if sr != nil { + sr = io.MultiReader(sr, segsr) + } else { + sr = segsr + } + } + + cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n)) + cpdirtmp := cpdir + ".tmp" + + if err := os.MkdirAll(cpdirtmp, 0777); err != nil { + return nil, errors.Wrap(err, "create checkpoint dir") + } + cp, err := wal.New(nil, nil, cpdirtmp) + if err != nil { + return nil, errors.Wrap(err, "open checkpoint") + } + + r := wal.NewReader(sr) + + var ( + series []RefSeries + samples []RefSample + tstones []Stone + dec RecordDecoder + enc RecordEncoder + buf []byte + recs [][]byte + ) + for r.Next() { + series, samples, tstones = series[:0], samples[:0], tstones[:0] + + // We don't reset the buffer since we batch up multiple records + // before writing them to the checkpoint. + // Remember where the record for this iteration starts. + start := len(buf) + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err = dec.Series(rec, series) + if err != nil { + return nil, errors.Wrap(err, "decode series") + } + // Drop irrelevant series in place. + repl := series[:0] + for _, s := range series { + if keep(s.Ref) { + repl = append(repl, s) + } + } + if len(repl) > 0 { + buf = enc.Series(repl, buf) + } + stats.TotalSeries += len(series) + stats.DroppedSeries += len(series) - len(repl) + + case RecordSamples: + samples, err = dec.Samples(rec, samples) + if err != nil { + return nil, errors.Wrap(err, "decode samples") + } + // Drop irrelevant samples in place. + repl := samples[:0] + for _, s := range samples { + if s.T >= mint { + repl = append(repl, s) + } + } + if len(repl) > 0 { + buf = enc.Samples(repl, buf) + } + stats.TotalSamples += len(samples) + stats.DroppedSamples += len(samples) - len(repl) + + case RecordTombstones: + tstones, err = dec.Tombstones(rec, tstones) + if err != nil { + return nil, errors.Wrap(err, "decode deletes") + } + // Drop irrelevant tombstones in place. + repl := tstones[:0] + for _, s := range tstones { + for _, iv := range s.intervals { + if iv.Maxt >= mint { + repl = append(repl, s) + break + } + } + } + if len(repl) > 0 { + buf = enc.Tombstones(repl, buf) + } + stats.TotalTombstones += len(tstones) + stats.DroppedTombstones += len(tstones) - len(repl) + + default: + return nil, errors.New("invalid record type") + } + if len(buf[start:]) == 0 { + continue // All contents discarded. + } + recs = append(recs, buf[start:]) + + // Flush records in 1 MB increments. + if len(buf) > 1*1024*1024 { + if err := cp.Log(recs...); err != nil { + return nil, errors.Wrap(err, "flush records") + } + buf, recs = buf[:0], recs[:0] + } + } + // If we hit any corruption during checkpointing, repairing is not an option. + // The head won't know which series records are lost. + if r.Err() != nil { + return nil, errors.Wrap(r.Err(), "read segments") + } + + // Flush remaining records. + if err := cp.Log(recs...); err != nil { + return nil, errors.Wrap(err, "flush records") + } + if err := cp.Close(); err != nil { + return nil, errors.Wrap(err, "close checkpoint") + } + if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { + return nil, errors.Wrap(err, "rename checkpoint directory") + } + if err := w.Truncate(n + 1); err != nil { + // If truncating fails, we'll just try again at the next checkpoint. + // Leftover segments will just be ignored in the future if there's a checkpoint + // that supersedes them. + level.Error(logger).Log("msg", "truncating segments failed", "err", err) + } + if err := DeleteCheckpoints(w.Dir(), n); err != nil { + // Leftover old checkpoints do not cause problems down the line beyond + // occupying disk space. + // They will just be ignored since a higher checkpoint exists. + level.Error(logger).Log("msg", "delete old checkpoints", "err", err) + } + return stats, nil +} diff --git a/checkpoint_test.go b/checkpoint_test.go new file mode 100644 index 0000000000..daa54df194 --- /dev/null +++ b/checkpoint_test.go @@ -0,0 +1,180 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" +) + +func TestLastCheckpoint(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s, k, err := LastCheckpoint(dir) + testutil.Equals(t, ErrNotFound, err) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.0000", s) + testutil.Equals(t, 0, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.xyz"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.0000", s) + testutil.Equals(t, 0, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.1", s) + testutil.Equals(t, 1, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1000"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.1000", s) + testutil.Equals(t, 1000, k) +} + +func TestDeleteCheckpoints(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + testutil.Ok(t, DeleteCheckpoints(dir, 0)) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.01"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.02"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.03"), 0777)) + + testutil.Ok(t, DeleteCheckpoints(dir, 2)) + + files, err := fileutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, []string{"checkpoint.02", "checkpoint.03"}, files) +} + +func TestCheckpoint(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + var enc RecordEncoder + // Create a dummy segment to bump the initial number. + seg, err := wal.CreateSegment(dir, 100) + testutil.Ok(t, err) + testutil.Ok(t, seg.Close()) + + // Manually create checkpoint for 99 and earlier. + w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099")) + testutil.Ok(t, err) + + // Add some data we expect to be around later. + err = w.Log(enc.Series([]RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, + }, nil)) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + // Start a WAL and write records to it as usual. + w, err = wal.NewSize(nil, nil, dir, 64*1024) + testutil.Ok(t, err) + + var last int64 + for i := 0; ; i++ { + _, n, err := w.Segments() + testutil.Ok(t, err) + if n >= 106 { + break + } + // Write some series initially. + if i == 0 { + b := enc.Series([]RefSeries{ + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + {Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")}, + }, nil) + testutil.Ok(t, w.Log(b)) + } + // Write samples until the WAL has enough segments. + // Make them have drifting timestamps within a record to see that they + // get filtered properly. + b := enc.Samples([]RefSample{ + {Ref: 0, T: last, V: float64(i)}, + {Ref: 1, T: last + 10000, V: float64(i)}, + {Ref: 2, T: last + 20000, V: float64(i)}, + {Ref: 3, T: last + 30000, V: float64(i)}, + }, nil) + testutil.Ok(t, w.Log(b)) + + last += 100 + } + testutil.Ok(t, w.Close()) + + _, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { + return x%2 == 0 + }, last/2) + testutil.Ok(t, err) + + // Only the new checkpoint should be left. + files, err := fileutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, 1, len(files)) + testutil.Equals(t, "checkpoint.000106", files[0]) + + sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) + testutil.Ok(t, err) + defer sr.Close() + + var dec RecordDecoder + var series []RefSeries + r := wal.NewReader(sr) + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err = dec.Series(rec, series) + testutil.Ok(t, err) + case RecordSamples: + samples, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + for _, s := range samples { + testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp") + } + } + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, []RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + }, series) +} diff --git a/db.go b/db.go index fcfbeeeb29..e6a0a74b4b 100644 --- a/db.go +++ b/db.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" "golang.org/x/sync/errgroup" ) @@ -191,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err := repairBadIndexVersion(l, dir); err != nil { return nil, err } + // Migrate old WAL. + if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil { + return nil, errors.Wrap(err, "migrate WAL") + } db = &DB{ dir: dir, @@ -221,18 +226,18 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, errors.Wrap(err, "create leveled compactor") } - wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r) + wlog, err := wal.New(l, r, filepath.Join(dir, "wal")) if err != nil { return nil, err } - db.head, err = NewHead(r, l, wal, opts.BlockRanges[0]) + db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0]) if err != nil { return nil, err } if err := db.reload(); err != nil { return nil, err } - if err := db.head.ReadWAL(); err != nil { + if err := db.head.Init(); err != nil { return nil, errors.Wrap(err, "read WAL") } @@ -501,7 +506,6 @@ func (db *DB) reload() (err error) { sort.Slice(blocks, func(i, j int) bool { return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime }) - if err := validateBlockSequence(blocks); err != nil { return errors.Wrap(err, "invalid block sequence") } @@ -596,10 +600,6 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps { if len(bm) <= 1 { return nil } - sort.Slice(bm, func(i, j int) bool { - return bm[i].MinTime < bm[j].MinTime - }) - var ( overlaps [][]BlockMeta diff --git a/db_test.go b/db_test.go index 205b0f8440..9c175118eb 100644 --- a/db_test.go +++ b/db_test.go @@ -19,6 +19,7 @@ import ( "math" "math/rand" "os" + "path" "path/filepath" "sort" "testing" @@ -30,6 +31,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { @@ -1025,33 +1027,41 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { testutil.Assert(t, len(OverlappingBlocks(metas)) == 0, "we found unexpected overlaps") - // Add overlapping blocks. + // Add overlapping blocks. We've to establish order again since we aren't interested + // in trivial overlaps caused by unorderedness. + add := func(ms ...BlockMeta) []BlockMeta { + repl := append(append([]BlockMeta{}, metas...), ms...) + sort.Slice(repl, func(i, j int) bool { + return repl[i].MinTime < repl[j].MinTime + }) + return repl + } // o1 overlaps with 10-20. o1 := BlockMeta{MinTime: 15, MaxTime: 17} testutil.Equals(t, Overlaps{ {Min: 15, Max: 17}: {metas[1], o1}, - }, OverlappingBlocks(append(metas, o1))) + }, OverlappingBlocks(add(o1))) // o2 overlaps with 20-30 and 30-40. o2 := BlockMeta{MinTime: 21, MaxTime: 31} testutil.Equals(t, Overlaps{ {Min: 21, Max: 30}: {metas[2], o2}, {Min: 30, Max: 31}: {o2, metas[3]}, - }, OverlappingBlocks(append(metas, o2))) + }, OverlappingBlocks(add(o2))) // o3a and o3b overlaps with 30-40 and each other. o3a := BlockMeta{MinTime: 33, MaxTime: 39} o3b := BlockMeta{MinTime: 34, MaxTime: 36} testutil.Equals(t, Overlaps{ {Min: 34, Max: 36}: {metas[3], o3a, o3b}, - }, OverlappingBlocks(append(metas, o3a, o3b))) + }, OverlappingBlocks(add(o3a, o3b))) // o4 is 1:1 overlap with 50-60. o4 := BlockMeta{MinTime: 50, MaxTime: 60} testutil.Equals(t, Overlaps{ {Min: 50, Max: 60}: {metas[5], o4}, - }, OverlappingBlocks(append(metas, o4))) + }, OverlappingBlocks(add(o4))) // o5 overlaps with 60-70, 70-80 and 80-90. o5 := BlockMeta{MinTime: 61, MaxTime: 85} @@ -1059,7 +1069,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { {Min: 61, Max: 70}: {metas[6], o5}, {Min: 70, Max: 80}: {o5, metas[7]}, {Min: 80, Max: 85}: {o5, metas[8]}, - }, OverlappingBlocks(append(metas, o5))) + }, OverlappingBlocks(add(o5))) // o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a. o6a := BlockMeta{MinTime: 92, MaxTime: 105} @@ -1067,7 +1077,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { testutil.Equals(t, Overlaps{ {Min: 94, Max: 99}: {metas[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, metas[10]}, - }, OverlappingBlocks(append(metas, o6a, o6b))) + }, OverlappingBlocks(add(o6a, o6b))) // All together. testutil.Equals(t, Overlaps{ @@ -1077,7 +1087,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { {Min: 50, Max: 60}: {metas[5], o4}, {Min: 61, Max: 70}: {metas[6], o5}, {Min: 70, Max: 80}: {o5, metas[7]}, {Min: 80, Max: 85}: {o5, metas[8]}, {Min: 94, Max: 99}: {metas[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, metas[10]}, - }, OverlappingBlocks(append(metas, o1, o2, o3a, o3b, o4, o5, o6a, o6b))) + }, OverlappingBlocks(add(o1, o2, o3a, o3b, o4, o5, o6a, o6b))) // Additional case. var nc1 []BlockMeta @@ -1185,3 +1195,109 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { count := len(q.(*querier).blocks) testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count) } + +func TestInitializeHeadTimestamp(t *testing.T) { + t.Run("clean", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + // Should be set to init values if no WAL or blocks exist so far. + testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime()) + testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime()) + + // First added sample initializes the writable range. + app := db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), 1000, 1) + testutil.Ok(t, err) + + testutil.Equals(t, int64(1000), db.head.MinTime()) + testutil.Equals(t, int64(1000), db.head.MaxTime()) + }) + t.Run("wal-only", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) + w, err := wal.New(nil, nil, path.Join(dir, "wal")) + testutil.Ok(t, err) + + var enc RecordEncoder + err = w.Log( + enc.Series([]RefSeries{ + {Ref: 123, Labels: labels.FromStrings("a", "1")}, + {Ref: 124, Labels: labels.FromStrings("a", "2")}, + }, nil), + enc.Samples([]RefSample{ + {Ref: 123, T: 5000, V: 1}, + {Ref: 124, T: 15000, V: 1}, + }, nil), + ) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + testutil.Equals(t, int64(5000), db.head.MinTime()) + testutil.Equals(t, int64(15000), db.head.MaxTime()) + }) + t.Run("existing-block", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + id := ulid.MustNew(2000, nil) + createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{ + ULID: id, + MinTime: 1000, + MaxTime: 2000, + }) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + testutil.Equals(t, int64(2000), db.head.MinTime()) + testutil.Equals(t, int64(2000), db.head.MaxTime()) + }) + t.Run("existing-block-and-wal", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + id := ulid.MustNew(2000, nil) + createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{ + ULID: id, + MinTime: 1000, + MaxTime: 6000, + }) + + testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) + w, err := wal.New(nil, nil, path.Join(dir, "wal")) + testutil.Ok(t, err) + + var enc RecordEncoder + err = w.Log( + enc.Series([]RefSeries{ + {Ref: 123, Labels: labels.FromStrings("a", "1")}, + {Ref: 124, Labels: labels.FromStrings("a", "2")}, + }, nil), + enc.Samples([]RefSample{ + {Ref: 123, T: 5000, V: 1}, + {Ref: 124, T: 15000, V: 1}, + }, nil), + ) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + testutil.Equals(t, int64(6000), db.head.MinTime()) + testutil.Equals(t, int64(15000), db.head.MaxTime()) + }) +} diff --git a/docs/format/wal.md b/docs/format/wal.md new file mode 100644 index 0000000000..7195e0bdfb --- /dev/null +++ b/docs/format/wal.md @@ -0,0 +1,86 @@ +# WAL Disk Format + +The write ahead log operates in segments that are numbered and sequential, +e.g. `000000`, `000001`, `000002`, etc., and are limited to 128MB by default. +A segment is written to in pages of 32KB. Only the last page of the most recent segment +may be partial. A WAL record is an opaque byte slice that gets split up into sub-records +should it exceed the remaining space of the current page. Records are never split across +segment boundaries. If a single record exceeds the default segment size, a segment with +a larger size will be created. +The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.](https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format) + +Notable deviations are that the record fragment is encoded as: + +``` +┌───────────┬──────────┬────────────┬──────────────┐ +│ type <1b> │ len <2b> │ CRC32 <4b> │ data │ +└───────────┴──────────┴────────────┴──────────────┘ +``` + +The type flag has the following states: + +* `0`: rest of page will be empty +* `1`: a full record encoded in a single fragment +* `2`: first fragment of a record +* `3`: middle fragment of a record +* `4`: final fragment of a record + +## Record encoding + +The records written to the write ahead log are encoded as follows: + +### Series records + +Series records encode the labels that identifies a series and its unique ID. + +``` +┌────────────────────────────────────────────┐ +│ type = 1 <1b> │ +├────────────────────────────────────────────┤ +│ ┌─────────┬──────────────────────────────┐ │ +│ │ id <8b> │ n = len(labels) │ │ +│ ├─────────┴────────────┬─────────────────┤ │ +│ │ len(str_1) │ str_1 │ │ +│ ├──────────────────────┴─────────────────┤ │ +│ │ ... │ │ +│ ├───────────────────────┬────────────────┤ │ +│ │ len(str_2n) │ str_2n │ │ +│ └───────────────────────┴────────────────┘ │ +│ . . . │ +└────────────────────────────────────────────┘ +``` + +### Sample records + +Sample records encode samples as a list of triples `(series_id, timestamp, value)`. +Series reference and timestamp are encoded as deltas w.r.t the first sample. + +``` +┌──────────────────────────────────────────────────────────────────┐ +│ type = 2 <1b> │ +├──────────────────────────────────────────────────────────────────┤ +│ ┌────────────────────┬───────────────────────────┬─────────────┐ │ +│ │ id <8b> │ timestamp <8b> │ value <8b> │ │ +│ └────────────────────┴───────────────────────────┴─────────────┘ │ +│ ┌────────────────────┬───────────────────────────┬─────────────┐ │ +│ │ id_delta │ timestamp_delta │ value <8b> │ │ +│ └────────────────────┴───────────────────────────┴─────────────┘ │ +│ . . . │ +└──────────────────────────────────────────────────────────────────┘ +``` + +### Tombstone records + +Tombstone records encode tombstones as a list of triples `(series_id, min_time, max_time)` +and specify an interval for which samples of a series got deleted. + +``` +┌─────────────────────────────────────────────────────┐ +│ type = 3 <1b> │ +├─────────────────────────────────────────────────────┤ +│ ┌─────────┬───────────────────┬───────────────────┐ │ +│ │ id <8b> │ min_time │ max_time │ │ +│ └─────────┴───────────────────┴───────────────────┘ │ +│ . . . │ +└─────────────────────────────────────────────────────┘ +``` diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index c2c25842ad..2158bfd265 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -6,6 +6,7 @@ package fileutil import ( "os" + "path/filepath" "sort" ) @@ -23,3 +24,45 @@ func ReadDir(dirpath string) ([]string, error) { sort.Strings(names) return names, nil } + +// Rename safely renames a file. +func Rename(from, to string) error { + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = Fsync(pdir); err != nil { + pdir.Close() + return err + } + return pdir.Close() +} + +// Replace moves a file or directory to a new location and deletes any previous data. +// It is not atomic. +func Replace(from, to string) error { + if err := os.RemoveAll(to); err != nil { + return nil + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = Fsync(pdir); err != nil { + pdir.Close() + return err + } + return pdir.Close() +} diff --git a/head.go b/head.go index 372842865c..4f0c2c958a 100644 --- a/head.go +++ b/head.go @@ -15,6 +15,7 @@ package tsdb import ( "math" + "path/filepath" "runtime" "sort" "strings" @@ -30,6 +31,7 @@ import ( "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" ) var ( @@ -53,9 +55,10 @@ var ( type Head struct { chunkRange int64 metrics *headMetrics - wal WAL + wal *wal.WAL logger log.Logger appendPool sync.Pool + bytesPool sync.Pool minTime, maxTime int64 lastSeriesID uint64 @@ -169,13 +172,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { } // NewHead opens the head block in dir. -func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) { if l == nil { l = log.NewNopLogger() } - if wal == nil { - wal = NopWAL() - } if chunkRange < 1 { return nil, errors.Errorf("invalid chunk range %d", chunkRange) } @@ -183,7 +183,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( wal: wal, logger: l, chunkRange: chunkRange, - minTime: math.MinInt64, + minTime: math.MaxInt64, maxTime: math.MinInt64, series: newStripeSeries(), values: map[string]stringset{}, @@ -200,15 +200,17 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( // them on to other workers. // Samples before the mint timestamp are discarded. func (h *Head) processWALSamples( - mint int64, + minValidTime int64, partition, total uint64, input <-chan []RefSample, output chan<- []RefSample, ) (unknownRefs uint64) { defer close(output) + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) + for samples := range input { for _, s := range samples { - if s.T < mint || s.Ref%total != partition { + if s.T < minValidTime || s.Ref%total != partition { continue } ms := h.series.getByID(s.Ref) @@ -221,18 +223,48 @@ func (h *Head) processWALSamples( h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } + if s.T > maxt { + maxt = s.T + } + if s.T < mint { + mint = s.T + } } output <- samples } + h.updateMinMaxTime(mint, maxt) + return unknownRefs } -// ReadWAL initializes the head by consuming the write ahead log. -func (h *Head) ReadWAL() error { - defer h.postings.EnsureOrder() +func (h *Head) updateMinMaxTime(mint, maxt int64) { + for { + lt := h.MinTime() + if mint >= lt { + break + } + if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) { + break + } + } + for { + ht := h.MaxTime() + if maxt <= ht { + break + } + if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) { + break + } + } +} - r := h.wal.Reader() - mint := h.MinTime() +func (h *Head) loadWAL(r *wal.Reader) error { + minValidTime := h.MinTime() + // If the min time is still uninitialized (no persisted blocks yet), + // we accept all sample timestamps from the WAL. + if minValidTime == math.MaxInt64 { + minValidTime = math.MinInt64 + } // Track number of samples that referenced a series we don't know about // for error reporting. @@ -253,7 +285,7 @@ func (h *Head) ReadWAL() error { output := make(chan []RefSample, 300) go func(i int, input <-chan []RefSample, output chan<- []RefSample) { - unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output) + unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() }(i, input, output) @@ -263,49 +295,71 @@ func (h *Head) ReadWAL() error { input = output } - // TODO(fabxc): series entries spread between samples can starve the sample workers. - // Even with bufferd channels, this can impact startup time with lots of series churn. - // We must not paralellize series creation itself but could make the indexing asynchronous. - seriesFunc := func(series []RefSeries) { - for _, s := range series { - h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + var ( + dec RecordDecoder + series []RefSeries + samples []RefSample + tstones []Stone + ) + for r.Next() { + series, samples, tstones = series[:0], samples[:0], tstones[:0] + rec := r.Record() - if h.lastSeriesID < s.Ref { - h.lastSeriesID = s.Ref + switch dec.Type(rec) { + case RecordSeries: + series, err := dec.Series(rec, series) + if err != nil { + return errors.Wrap(err, "decode series") } - } - } - samplesFunc := func(samples []RefSample) { - // We split up the samples into chunks of 5000 samples or less. - // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise - // cause thousands of very large in flight buffers occupying large amounts - // of unused memory. - for len(samples) > 0 { - n := 5000 - if len(samples) < n { - n = len(samples) - } - var buf []RefSample - select { - case buf = <-input: - default: - } - firstInput <- append(buf[:0], samples[:n]...) - samples = samples[n:] - } - } - deletesFunc := func(stones []Stone) { - for _, s := range stones { - for _, itv := range s.intervals { - if itv.Maxt < mint { - continue + for _, s := range series { + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref } - h.tombstones.addInterval(s.ref, itv) } + case RecordSamples: + samples, err := dec.Samples(rec, samples) + if err != nil { + return errors.Wrap(err, "decode samples") + } + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + n := 5000 + if len(samples) < n { + n = len(samples) + } + var buf []RefSample + select { + case buf = <-input: + default: + } + firstInput <- append(buf[:0], samples[:n]...) + samples = samples[n:] + } + case RecordTombstones: + tstones, err := dec.Tombstones(rec, tstones) + if err != nil { + return errors.Wrap(err, "decode tombstones") + } + for _, s := range tstones { + for _, itv := range s.intervals { + if itv.Maxt < minValidTime { + continue + } + h.tombstones.addInterval(s.ref, itv) + } + } + default: + return errors.Errorf("invalid record type %v", dec.Type(rec)) } } - - err := r.Read(seriesFunc, samplesFunc, deletesFunc) + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } // Signal termination to first worker and wait for last one to close its output channel. close(firstInput) @@ -313,20 +367,64 @@ func (h *Head) ReadWAL() error { } wg.Wait() - if err != nil { - return errors.Wrap(err, "consume WAL") - } if unknownRefs > 0 { - level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs) + level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) } return nil } -// Truncate removes all data before mint from the head block and truncates its WAL. -func (h *Head) Truncate(mint int64) error { - initialize := h.MinTime() == math.MinInt64 +// Init loads data from the write ahead log and prepares the head for writes. +func (h *Head) Init() error { + defer h.postings.EnsureOrder() - if h.MinTime() >= mint { + if h.wal == nil { + return nil + } + + // Backfill the checkpoint first if it exists. + cp, n, err := LastCheckpoint(h.wal.Dir()) + if err != nil && err != ErrNotFound { + return errors.Wrap(err, "find last checkpoint") + } + if err == nil { + sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp)) + if err != nil { + return errors.Wrap(err, "open checkpoint") + } + defer sr.Close() + + // A corrupted checkpoint is a hard error for now and requires user + // intervention. There's likely little data that can be recovered anyway. + if err := h.loadWAL(wal.NewReader(sr)); err != nil { + return errors.Wrap(err, "backfill checkpoint") + } + n++ + } + + // Backfill segments from the last checkpoint onwards + sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1) + if err != nil { + return errors.Wrap(err, "open WAL segments") + } + defer sr.Close() + + err = h.loadWAL(wal.NewReader(sr)) + if err == nil { + return nil + } + level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) + + if err := h.wal.Repair(err); err != nil { + return errors.Wrap(err, "repair corrupted WAL") + } + return nil +} + +// Truncate removes old data before mint from the head. +func (h *Head) Truncate(mint int64) error { + initialize := h.MinTime() == math.MaxInt64 + + if h.MinTime() >= mint && !initialize { return nil } atomic.StoreInt64(&h.minTime, mint) @@ -348,18 +446,37 @@ func (h *Head) Truncate(mint int64) error { level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start)) h.metrics.gcDuration.Observe(time.Since(start).Seconds()) + if h.wal == nil { + return nil + } start = time.Now() + m, n, err := h.wal.Segments() + if err != nil { + return errors.Wrap(err, "get segment range") + } + n-- // Never consider last segment for checkpoint. + if n < 0 { + return nil // no segments yet. + } + // The lower third of segments should contain mostly obsolete samples. + // If we have less than three segments, it's not worth checkpointing yet. + n = m + (n-m)/3 + if n <= m { + return nil + } + keep := func(id uint64) bool { return h.series.getByID(id) != nil } - if err := h.wal.Truncate(mint, keep); err == nil { - level.Info(h.logger).Log("msg", "WAL truncation completed", "duration", time.Since(start)) - } else { - level.Error(h.logger).Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start)) + if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil { + return errors.Wrap(err, "create checkpoint") } h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) + level.Info(h.logger).Log("msg", "WAL checkpoint complete", + "low", m, "high", n, "duration", time.Since(start)) + return nil } @@ -367,10 +484,7 @@ func (h *Head) Truncate(mint int64) error { // for a compltely fresh head with an empty WAL. // Returns true if the initialization took an effect. func (h *Head) initTime(t int64) (initialized bool) { - // In the init state, the head has a high timestamp of math.MinInt64. - mint, _ := rangeForTimestamp(t, h.chunkRange) - - if !atomic.CompareAndSwapInt64(&h.minTime, math.MinInt64, mint) { + if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) { return false } // Ensure that max time is initialized to at least the min time we just set. @@ -441,7 +555,7 @@ func (h *Head) Appender() Appender { // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. - if h.MinTime() == math.MinInt64 { + if h.MinTime() == math.MaxInt64 { return &initAppender{head: h} } return h.appender() @@ -449,10 +563,11 @@ func (h *Head) Appender() Appender { func (h *Head) appender() *headAppender { return &headAppender{ - head: h, - mint: h.MaxTime() - h.chunkRange/2, - maxt: math.MinInt64, - samples: h.getAppendBuffer(), + head: h, + minValidTime: h.MaxTime() - h.chunkRange/2, + mint: math.MaxInt64, + maxt: math.MinInt64, + samples: h.getAppendBuffer(), } } @@ -468,16 +583,29 @@ func (h *Head) putAppendBuffer(b []RefSample) { h.appendPool.Put(b[:0]) } +func (h *Head) getBytesBuffer() []byte { + b := h.bytesPool.Get() + if b == nil { + return make([]byte, 0, 1024) + } + return b.([]byte) +} + +func (h *Head) putBytesBuffer(b []byte) { + h.bytesPool.Put(b[:0]) +} + type headAppender struct { - head *Head - mint, maxt int64 + head *Head + minValidTime int64 // No samples below this timestamp are allowed. + mint, maxt int64 series []RefSeries samples []RefSample } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - if t < a.mint { + if t < a.minValidTime { return 0, ErrOutOfBounds } @@ -504,9 +632,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if err != nil { return err } - if t < a.mint { + if t < a.minValidTime { return ErrOutOfBounds } + if t < a.mint { + a.mint = t + } if t > a.maxt { a.maxt = t } @@ -520,15 +651,42 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { return nil } +func (a *headAppender) log() error { + if a.head.wal == nil { + return nil + } + + buf := a.head.getBytesBuffer() + defer func() { a.head.putBytesBuffer(buf) }() + + var rec []byte + var enc RecordEncoder + + if len(a.series) > 0 { + rec = enc.Series(a.series, buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log series") + } + } + if len(a.samples) > 0 { + rec = enc.Samples(a.samples, buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log samples") + } + } + return nil +} + func (a *headAppender) Commit() error { defer a.head.metrics.activeAppenders.Dec() defer a.head.putAppendBuffer(a.samples) - if err := a.head.wal.LogSeries(a.series); err != nil { - return err - } - if err := a.head.wal.LogSamples(a.samples); err != nil { - return errors.Wrap(err, "WAL log samples") + if err := a.log(); err != nil { + return errors.Wrap(err, "write to WAL") } total := len(a.samples) @@ -548,16 +706,7 @@ func (a *headAppender) Commit() error { } a.head.metrics.samplesAppended.Add(float64(total)) - - for { - ht := a.head.MaxTime() - if a.maxt <= ht { - break - } - if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.maxt) { - break - } - } + a.head.updateMinMaxTime(a.mint, a.maxt) return nil } @@ -568,7 +717,8 @@ func (a *headAppender) Rollback() error { // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. - return a.head.wal.LogSeries(a.series) + a.samples = nil + return a.log() } // Delete all samples in the range of [mint, maxt] for series that satisfy the given @@ -601,8 +751,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { if p.Err() != nil { return p.Err() } - if err := h.wal.LogDeletes(stones); err != nil { - return err + var enc RecordEncoder + + if h.wal != nil { + if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { + return err + } } for _, s := range stones { h.tombstones.addInterval(s.ref, s.intervals[0]) @@ -694,6 +848,9 @@ func (h *Head) MaxTime() int64 { // Close flushes the WAL and closes the head. func (h *Head) Close() error { + if h.wal == nil { + return nil + } return h.wal.Close() } diff --git a/head_test.go b/head_test.go index 9a8c893644..b06a66c26c 100644 --- a/head_test.go +++ b/head_test.go @@ -14,7 +14,9 @@ package tsdb import ( + "io/ioutil" "math/rand" + "os" "testing" "github.com/prometheus/tsdb/chunkenc" @@ -22,6 +24,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func BenchmarkCreateSeries(b *testing.B) { @@ -42,42 +45,50 @@ func BenchmarkCreateSeries(b *testing.B) { } } -type memoryWAL struct { - nopWAL - entries []interface{} -} - -func (w *memoryWAL) LogSeries(s []RefSeries) error { - w.entries = append(w.entries, s) - return nil -} - -func (w *memoryWAL) LogSamples(s []RefSample) error { - w.entries = append(w.entries, s) - return nil -} - -func (w *memoryWAL) LogDeletes(s []Stone) error { - w.entries = append(w.entries, s) - return nil -} - -func (w *memoryWAL) Reader() WALReader { - return w -} - -func (w *memoryWAL) Read(series func([]RefSeries), samples func([]RefSample), deletes func([]Stone)) error { - for _, e := range w.entries { - switch v := e.(type) { +func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { + var enc RecordEncoder + for _, r := range recs { + switch v := r.(type) { case []RefSeries: - series(v) + testutil.Ok(t, w.Log(enc.Series(v, nil))) case []RefSample: - samples(v) + testutil.Ok(t, w.Log(enc.Samples(v, nil))) case []Stone: - deletes(v) + testutil.Ok(t, w.Log(enc.Tombstones(v, nil))) } } - return nil +} + +func readTestWAL(t testing.TB, dir string) (recs []interface{}) { + sr, err := wal.NewSegmentsReader(dir) + testutil.Ok(t, err) + defer sr.Close() + + var dec RecordDecoder + r := wal.NewReader(sr) + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err := dec.Series(rec, nil) + testutil.Ok(t, err) + recs = append(recs, series) + case RecordSamples: + samples, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + recs = append(recs, samples) + case RecordTombstones: + tstones, err := dec.Tombstones(rec, nil) + testutil.Ok(t, err) + recs = append(recs, tstones) + default: + t.Fatalf("unknown record type") + } + } + testutil.Ok(t, r.Err()) + return recs } func TestHead_ReadWAL(t *testing.T) { @@ -100,13 +111,19 @@ func TestHead_ReadWAL(t *testing.T) { {Ref: 50, T: 101, V: 6}, }, } - wal := &memoryWAL{entries: entries} + dir, err := ioutil.TempDir("", "test_read_wal") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - head, err := NewHead(nil, nil, wal, 1000) + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) + populateTestWAL(t, w, entries) + + head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) defer head.Close() - testutil.Ok(t, head.ReadWAL()) + testutil.Ok(t, head.Init()) testutil.Equals(t, uint64(100), head.lastSeriesID) s10 := head.series.getByID(10) @@ -259,13 +276,19 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { {Ref: 50, T: 90, V: 1}, }, } - wal := &memoryWAL{entries: entries} + dir, err := ioutil.TempDir("", "test_delete_series") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - head, err := NewHead(nil, nil, wal, 1000) + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) + populateTestWAL(t, w, entries) + + head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) defer head.Close() - testutil.Ok(t, head.ReadWAL()) + testutil.Ok(t, head.Init()) testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) } @@ -705,7 +728,7 @@ func TestMemSeries_append(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. - h, err := NewHead(nil, nil, NopWAL(), 1000) + h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() @@ -745,7 +768,7 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, NopWAL(), 1000) + h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() @@ -786,7 +809,12 @@ func TestGCSeriesAccess(t *testing.T) { } func TestHead_LogRollback(t *testing.T) { - w := &memoryWAL{} + dir, err := ioutil.TempDir("", "wal_rollback") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) h, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) @@ -795,9 +823,11 @@ func TestHead_LogRollback(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) - testutil.Equals(t, 1, len(w.entries)) + recs := readTestWAL(t, w.Dir()) - series, ok := w.entries[0].([]RefSeries) - testutil.Assert(t, ok, "expected series record but got %+v", w.entries[0]) + testutil.Equals(t, 1, len(recs)) + + series, ok := recs[0].([]RefSeries) + testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) testutil.Equals(t, series, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}) } diff --git a/record.go b/record.go new file mode 100644 index 0000000000..c8cc7a5043 --- /dev/null +++ b/record.go @@ -0,0 +1,213 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "math" + "sort" + + "github.com/pkg/errors" + "github.com/prometheus/tsdb/labels" +) + +// RecordType represents the data type of a record. +type RecordType uint8 + +const ( + RecordInvalid RecordType = 255 + RecordSeries RecordType = 1 + RecordSamples RecordType = 2 + RecordTombstones RecordType = 3 +) + +type RecordLogger interface { + Log(recs ...[]byte) error +} + +type RecordReader interface { + Next() bool + Err() error + Record() []byte +} + +// RecordDecoder decodes series, sample, and tombstone records. +// The zero value is ready to use. +type RecordDecoder struct { +} + +// Type returns the type of the record. +// Return RecordInvalid if no valid record type is found. +func (d *RecordDecoder) Type(rec []byte) RecordType { + if len(rec) < 1 { + return RecordInvalid + } + switch t := RecordType(rec[0]); t { + case RecordSeries, RecordSamples, RecordTombstones: + return t + } + return RecordInvalid +} + +// Series appends series in rec to the given slice. +func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordSeries { + return nil, errors.New("invalid record type") + } + for len(dec.b) > 0 && dec.err() == nil { + ref := dec.be64() + + lset := make(labels.Labels, dec.uvarint()) + + for i := range lset { + lset[i].Name = dec.uvarintStr() + lset[i].Value = dec.uvarintStr() + } + sort.Sort(lset) + + series = append(series, RefSeries{ + Ref: ref, + Labels: lset, + }) + } + if dec.err() != nil { + return nil, dec.err() + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return series, nil +} + +// Samples appends samples in rec to the given slice. +func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordSamples { + return nil, errors.New("invalid record type") + } + if dec.len() == 0 { + return samples, nil + } + var ( + baseRef = dec.be64() + baseTime = dec.be64int64() + ) + for len(dec.b) > 0 && dec.err() == nil { + dref := dec.varint64() + dtime := dec.varint64() + val := dec.be64() + + samples = append(samples, RefSample{ + Ref: uint64(int64(baseRef) + dref), + T: baseTime + dtime, + V: math.Float64frombits(val), + }) + } + + if dec.err() != nil { + return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return samples, nil +} + +// Tombstones appends tombstones in rec to the given slice. +func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordTombstones { + return nil, errors.New("invalid record type") + } + for dec.len() > 0 && dec.err() == nil { + tstones = append(tstones, Stone{ + ref: dec.be64(), + intervals: Intervals{ + {Mint: dec.varint64(), Maxt: dec.varint64()}, + }, + }) + } + if dec.err() != nil { + return nil, dec.err() + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return tstones, nil +} + +// RecordEncoder encodes series, sample, and tombstones records. +// The zero value is ready to use. +type RecordEncoder struct { +} + +// Series appends the encoded series to b and returns the resulting slice. +func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordSeries)) + + for _, s := range series { + buf.putBE64(s.Ref) + buf.putUvarint(len(s.Labels)) + + for _, l := range s.Labels { + buf.putUvarintStr(l.Name) + buf.putUvarintStr(l.Value) + } + } + return buf.get() +} + +// Samples appends the encoded samples to b and returns the resulting slice. +func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordSamples)) + + if len(samples) == 0 { + return buf.get() + } + + // Store base timestamp and base reference number of first sample. + // All samples encode their timestamp and ref as delta to those. + first := samples[0] + + buf.putBE64(first.Ref) + buf.putBE64int64(first.T) + + for _, s := range samples { + buf.putVarint64(int64(s.Ref) - int64(first.Ref)) + buf.putVarint64(s.T - first.T) + buf.putBE64(math.Float64bits(s.V)) + } + return buf.get() +} + +// Tombstones appends the encoded tombstones to b and returns the resulting slice. +func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordTombstones)) + + for _, s := range tstones { + for _, iv := range s.intervals { + buf.putBE64(s.ref) + buf.putVarint64(iv.Mint) + buf.putVarint64(iv.Maxt) + } + } + return buf.get() +} diff --git a/record_test.go b/record_test.go new file mode 100644 index 0000000000..4257fc0c52 --- /dev/null +++ b/record_test.go @@ -0,0 +1,73 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "testing" + + "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/testutil" +) + +func TestRecord_EncodeDecode(t *testing.T) { + var enc RecordEncoder + var dec RecordDecoder + + series := []RefSeries{ + { + Ref: 100, + Labels: labels.FromStrings("abc", "def", "123", "456"), + }, { + Ref: 1, + Labels: labels.FromStrings("abc", "def2", "1234", "4567"), + }, { + Ref: 435245, + Labels: labels.FromStrings("xyz", "def", "foo", "bar"), + }, + } + decSeries, err := dec.Series(enc.Series(series, nil), nil) + testutil.Ok(t, err) + testutil.Equals(t, series, decSeries) + + samples := []RefSample{ + {Ref: 0, T: 12423423, V: 1.2345}, + {Ref: 123, T: -1231, V: -123}, + {Ref: 2, T: 0, V: 99999}, + } + decSamples, err := dec.Samples(enc.Samples(samples, nil), nil) + testutil.Ok(t, err) + testutil.Equals(t, samples, decSamples) + + // Intervals get split up into single entries. So we don't get back exactly + // what we put in. + tstones := []Stone{ + {ref: 123, intervals: Intervals{ + {Mint: -1000, Maxt: 1231231}, + {Mint: 5000, Maxt: 0}, + }}, + {ref: 13, intervals: Intervals{ + {Mint: -1000, Maxt: -11}, + {Mint: 5000, Maxt: 1000}, + }}, + } + decTstones, err := dec.Tombstones(enc.Tombstones(tstones, nil), nil) + testutil.Ok(t, err) + testutil.Equals(t, []Stone{ + {ref: 123, intervals: Intervals{{Mint: -1000, Maxt: 1231231}}}, + {ref: 123, intervals: Intervals{{Mint: 5000, Maxt: 0}}}, + {ref: 13, intervals: Intervals{{Mint: -1000, Maxt: -11}}}, + {ref: 13, intervals: Intervals{{Mint: 5000, Maxt: 1000}}}, + }, decTstones) +} diff --git a/repair_test.go b/repair_test.go index f4c9d20874..c80976002a 100644 --- a/repair_test.go +++ b/repair_test.go @@ -76,7 +76,7 @@ func TestRepairBadIndexVersion(t *testing.T) { } // On DB opening all blocks in the base dir should be repaired. - db, _ := Open("testdata/repair_index_version", nil, nil, nil) + db, err := Open("testdata/repair_index_version", nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/wal.go b/wal.go index c1b9f6b062..972fdea38c 100644 --- a/wal.go +++ b/wal.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" ) // WALEntryType indicates what data a WAL entry contains. @@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. +// +// DEPRECATED: use wal pkg combined with the record codex instead. type WAL interface { Reader() WALReader LogSeries([]RefSeries) error @@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 { } // SegmentWAL is a write ahead log for series data. +// +// DEPRECATED: use wal pkg combined with the record coders instead. type SegmentWAL struct { mtx sync.Mutex metrics *walMetrics @@ -1206,3 +1211,100 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { } return nil } + +// MigrateWAL rewrites the deprecated write ahead log into the new format. +func MigrateWAL(logger log.Logger, dir string) (err error) { + if logger == nil { + logger = log.NewNopLogger() + } + // Detect whether we still have the old WAL. + fns, err := sequenceFiles(dir) + if err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "list sequence files") + } + if len(fns) == 0 { + return nil // No WAL at all yet. + } + // Check header of first segment to see whether we are still dealing with an + // old WAL. + f, err := os.Open(fns[0]) + if err != nil { + return errors.Wrap(err, "check first existing segment") + } + defer f.Close() + + var hdr [4]byte + if _, err := f.Read(hdr[:]); err != nil && err != io.EOF { + return errors.Wrap(err, "read header from first segment") + } + // If we cannot read the magic header for segments of the old WAL, abort. + // Either it's migrated already or there's a corruption issue with which + // we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case. + if binary.BigEndian.Uint32(hdr[:]) != WALMagic { + return nil + } + + level.Info(logger).Log("msg", "migrating WAL format") + + tmpdir := dir + ".tmp" + if err := os.RemoveAll(tmpdir); err != nil { + return errors.Wrap(err, "cleanup replacement dir") + } + repl, err := wal.New(logger, nil, tmpdir) + if err != nil { + return errors.Wrap(err, "open new WAL") + } + // It should've already been closed as part of the previous finalization. + // Do it once again in case of prior errors. + defer func() { + if err != nil { + repl.Close() + } + }() + + w, err := OpenSegmentWAL(dir, logger, time.Minute, nil) + if err != nil { + return errors.Wrap(err, "open old WAL") + } + defer w.Close() + + rdr := w.Reader() + + var ( + enc RecordEncoder + b []byte + ) + decErr := rdr.Read( + func(s []RefSeries) { + if err != nil { + return + } + err = repl.Log(enc.Series(s, b[:0])) + }, + func(s []RefSample) { + if err != nil { + return + } + err = repl.Log(enc.Samples(s, b[:0])) + }, + func(s []Stone) { + if err != nil { + return + } + err = repl.Log(enc.Tombstones(s, b[:0])) + }, + ) + if decErr != nil { + return errors.Wrap(err, "decode old entries") + } + if err != nil { + return errors.Wrap(err, "write new entries") + } + if err := repl.Close(); err != nil { + return errors.Wrap(err, "close new WAL") + } + if err := fileutil.Replace(tmpdir, dir); err != nil { + return errors.Wrap(err, "replace old WAL") + } + return nil +} diff --git a/wal/wal.go b/wal/wal.go new file mode 100644 index 0000000000..e59b0e15d0 --- /dev/null +++ b/wal/wal.go @@ -0,0 +1,822 @@ +// Copyright 2017 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bufio" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "math" + "os" + "path/filepath" + "sort" + "strconv" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/fileutil" +) + +const ( + defaultSegmentSize = 128 * 1024 * 1024 // 128 MB + pageSize = 32 * 1024 // 32KB + recordHeaderSize = 7 +) + +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) + +type page struct { + alloc int + flushed int + buf [pageSize]byte +} + +func (p *page) remaining() int { + return pageSize - p.alloc +} + +func (p *page) full() bool { + return pageSize-p.alloc < recordHeaderSize +} + +// Segment represents a segment file. +type Segment struct { + *os.File + dir string + i int +} + +// Index returns the index of the segment. +func (s *Segment) Index() int { + return s.i +} + +// Dir returns the directory of the segment. +func (s *Segment) Dir() string { + return s.dir +} + +// CorruptionErr is an error that's returned when corruption is encountered. +type CorruptionErr struct { + Segment int + Offset int + Err error +} + +func (e *CorruptionErr) Error() string { + if e.Segment < 0 { + return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err) + } + return fmt.Sprintf("corruption in segment %d at %d: %s", e.Segment, e.Offset, e.Err) +} + +// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. +func OpenWriteSegment(dir string, k int) (*Segment, error) { + f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + stat, err := f.Stat() + if err != nil { + f.Close() + return nil, err + } + // If the last page is torn, fill it with zeros. + // In case it was torn after all records were written successfully, this + // will just pad the page and everything will be fine. + // If it was torn mid-record, a full read (which the caller should do anyway + // to ensure integrity) will detect it as a corruption by the end. + if d := stat.Size() % pageSize; d != 0 { + if _, err := f.Write(make([]byte, pageSize-d)); err != nil { + f.Close() + return nil, errors.Wrap(err, "zero-pad torn page") + } + } + return &Segment{File: f, i: k, dir: dir}, nil +} + +// CreateSegment creates a new segment k in dir. +func CreateSegment(dir string, k int) (*Segment, error) { + f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + return &Segment{File: f, i: k, dir: dir}, nil +} + +// OpenReadSegment opens the segment with the given filename. +func OpenReadSegment(fn string) (*Segment, error) { + k, err := strconv.Atoi(filepath.Base(fn)) + if err != nil { + return nil, errors.New("not a valid filename") + } + f, err := os.Open(fn) + if err != nil { + return nil, err + } + return &Segment{File: f, i: k, dir: filepath.Dir(fn)}, nil +} + +// WAL is a write ahead log that stores records in segment files. +// It must be read from start to end once before logging new data. +// If an error occurs during read, the repair procedure must be called +// before it's safe to do further writes. +// +// Segments are written to in pages of 32KB, with records possibly split +// across page boundaries. +// Records are never split across segments to allow full segments to be +// safely truncated. It also ensures that torn writes never corrupt records +// beyond the most recent segment. +type WAL struct { + dir string + logger log.Logger + segmentSize int + mtx sync.RWMutex + segment *Segment // active segment + donePages int // pages written to the segment + page *page // active page + stopc chan chan struct{} + actorc chan func() + + fsyncDuration prometheus.Summary + pageFlushes prometheus.Counter + pageCompletions prometheus.Counter +} + +// New returns a new WAL over the given directory. +func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { + return NewSize(logger, reg, dir, defaultSegmentSize) +} + +// NewSize returns a new WAL over the given directory. +// New segments are created with the specified size. +func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { + if segmentSize%pageSize != 0 { + return nil, errors.New("invalid segment size") + } + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, errors.Wrap(err, "create dir") + } + if logger == nil { + logger = log.NewNopLogger() + } + w := &WAL{ + dir: dir, + logger: logger, + segmentSize: segmentSize, + page: &page{}, + actorc: make(chan func(), 100), + stopc: make(chan chan struct{}), + } + w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "prometheus_tsdb_wal_fsync_duration_seconds", + Help: "Duration of WAL fsync.", + }) + w.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_page_flushes_total", + Help: "Total number of page flushes.", + }) + w.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_completed_pages_total", + Help: "Total number of completed pages.", + }) + if reg != nil { + reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions) + } + + _, j, err := w.Segments() + if err != nil { + return nil, errors.Wrap(err, "get segment range") + } + // Fresh dir, no segments yet. + if j == -1 { + if w.segment, err = CreateSegment(w.dir, 0); err != nil { + return nil, err + } + } else { + if w.segment, err = OpenWriteSegment(w.dir, j); err != nil { + return nil, err + } + // Correctly initialize donePages. + stat, err := w.segment.Stat() + if err != nil { + return nil, err + } + w.donePages = int(stat.Size() / pageSize) + } + go w.run() + + return w, nil +} + +// Dir returns the directory of the WAL. +func (w *WAL) Dir() string { + return w.dir +} + +func (w *WAL) run() { +Loop: + for { + select { + case f := <-w.actorc: + f() + case donec := <-w.stopc: + close(w.actorc) + defer close(donec) + break Loop + } + } + // Drain and process any remaining functions. + for f := range w.actorc { + f() + } +} + +// Repair attempts to repair the WAL based on the error. +// It discards all data after the corruption. +func (w *WAL) Repair(err error) error { + // We could probably have a mode that only discards torn records right around + // the corruption to preserve as data much as possible. + // But that's not generally applicable if the records have any kind of causality. + // Maybe as an extra mode in the future if mid-WAL corruptions become + // a frequent concern. + cerr, ok := err.(*CorruptionErr) + if !ok { + return errors.New("cannot handle error") + } + if cerr.Segment < 0 { + return errors.New("corruption error does not specify position") + } + + level.Warn(w.logger).Log("msg", "starting corruption repair", + "segment", cerr.Segment, "offset", cerr.Offset) + + // All segments behind the corruption can no longer be used. + segs, err := listSegments(w.dir) + if err != nil { + return errors.Wrap(err, "list segments") + } + level.Warn(w.logger).Log("msg", "deleting all segments behind corruption") + + for _, s := range segs { + if s.n <= cerr.Segment { + continue + } + if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { + return errors.Wrap(err, "delete segment") + } + } + // Regardless of the corruption offset, no record reaches into the previous segment. + // So we can safely repair the WAL by removing the segment and re-inserting all + // its records up to the corruption. + level.Warn(w.logger).Log("msg", "rewrite corrupted segment") + + fn := SegmentName(w.dir, cerr.Segment) + tmpfn := fn + ".repair" + + if err := fileutil.Rename(fn, tmpfn); err != nil { + return err + } + // Create a clean segment and make it the active one. + s, err := CreateSegment(w.dir, cerr.Segment) + if err != nil { + return err + } + w.segment = s + + f, err := os.Open(tmpfn) + if err != nil { + return errors.Wrap(err, "open segment") + } + defer f.Close() + r := NewReader(bufio.NewReader(f)) + + for r.Next() { + if err := w.Log(r.Record()); err != nil { + return errors.Wrap(err, "insert record") + } + } + // We expect an error here, so nothing to handle. + + if err := os.Remove(tmpfn); err != nil { + return errors.Wrap(err, "delete corrupted segment") + } + return nil +} + +// SegmentName builds a segment name for the directory. +func SegmentName(dir string, i int) string { + return filepath.Join(dir, fmt.Sprintf("%08d", i)) +} + +// nextSegment creates the next segment and closes the previous one. +func (w *WAL) nextSegment() error { + // Only flush the current page if it actually holds data. + if w.page.alloc > 0 { + if err := w.flushPage(true); err != nil { + return err + } + } + next, err := CreateSegment(w.dir, w.segment.Index()+1) + if err != nil { + return errors.Wrap(err, "create new segment file") + } + prev := w.segment + w.segment = next + w.donePages = 0 + + // Don't block further writes by fsyncing the last segment. + w.actorc <- func() { + if err := w.fsync(prev); err != nil { + level.Error(w.logger).Log("msg", "sync previous segment", "err", err) + } + if err := prev.Close(); err != nil { + level.Error(w.logger).Log("msg", "close previous segment", "err", err) + } + } + return nil +} + +// flushPage writes the new contents of the page to disk. If no more records will fit into +// the page, the remaining bytes will be set to zero and a new page will be started. +// If clear is true, this is enforced regardless of how many bytes are left in the page. +func (w *WAL) flushPage(clear bool) error { + w.pageFlushes.Inc() + + p := w.page + clear = clear || p.full() + + // No more data will fit into the page. Enqueue and clear it. + if clear { + p.alloc = pageSize // write till end of page + w.pageCompletions.Inc() + } + n, err := w.segment.Write(p.buf[p.flushed:p.alloc]) + if err != nil { + return err + } + p.flushed += n + + // We flushed an entire page, prepare a new one. + if clear { + for i := range p.buf { + p.buf[i] = 0 + } + p.alloc = 0 + p.flushed = 0 + w.donePages++ + } + return nil +} + +type recType uint8 + +const ( + recPageTerm recType = 0 // Rest of page is empty. + recFull recType = 1 // Full record. + recFirst recType = 2 // First fragment of a record. + recMiddle recType = 3 // Middle fragments of a record. + recLast recType = 4 // Final fragment of a record. +) + +func (t recType) String() string { + switch t { + case recPageTerm: + return "zero" + case recFull: + return "full" + case recFirst: + return "first" + case recMiddle: + return "middle" + case recLast: + return "last" + default: + return "" + } +} + +func (w *WAL) pagesPerSegment() int { + return w.segmentSize / pageSize +} + +// Log writes the records into the log. +// Multiple records can be passed at once to reduce writes and increase throughput. +func (w *WAL) Log(recs ...[]byte) error { + w.mtx.Lock() + defer w.mtx.Unlock() + // Callers could just implement their own list record format but adding + // a bit of extra logic here frees them from that overhead. + for i, r := range recs { + if err := w.log(r, i == len(recs)-1); err != nil { + return err + } + } + return nil +} + +// log writes rec to the log and forces a flush of the current page if its +// the final record of a batch. +func (w *WAL) log(rec []byte, final bool) error { + // If the record is too big to fit within pages in the current + // segment, terminate the active segment and advance to the next one. + // This ensures that records do not cross segment boundaries. + left := w.page.remaining() - recordHeaderSize // Active pages. + left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages. + + if len(rec) > left { + if err := w.nextSegment(); err != nil { + return err + } + } + + // Populate as many pages as necessary to fit the record. + // Be careful to always do one pass to ensure we write zero-length records. + for i := 0; i == 0 || len(rec) > 0; i++ { + p := w.page + + // Find how much of the record we can fit into the page. + var ( + l = min(len(rec), (pageSize-p.alloc)-recordHeaderSize) + part = rec[:l] + buf = p.buf[p.alloc:] + typ recType + ) + + switch { + case i == 0 && len(part) == len(rec): + typ = recFull + case len(part) == len(rec): + typ = recLast + case i == 0: + typ = recFirst + default: + typ = recMiddle + } + + buf[0] = byte(typ) + crc := crc32.Checksum(part, castagnoliTable) + binary.BigEndian.PutUint16(buf[1:], uint16(len(part))) + binary.BigEndian.PutUint32(buf[3:], crc) + + copy(buf[recordHeaderSize:], part) + p.alloc += len(part) + recordHeaderSize + + // If we wrote a full record, we can fit more records of the batch + // into the page before flushing it. + if final || typ != recFull || w.page.full() { + if err := w.flushPage(false); err != nil { + return err + } + } + rec = rec[l:] + } + return nil +} + +// Segments returns the range [m, n] of currently existing segments. +// If no segments are found, m and n are -1. +func (w *WAL) Segments() (m, n int, err error) { + refs, err := listSegments(w.dir) + if err != nil { + return 0, 0, err + } + if len(refs) == 0 { + return -1, -1, nil + } + return refs[0].n, refs[len(refs)-1].n, nil +} + +// Truncate drops all segments before i. +func (w *WAL) Truncate(i int) error { + refs, err := listSegments(w.dir) + if err != nil { + return err + } + for _, r := range refs { + if r.n >= i { + break + } + if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil { + return err + } + } + return nil +} + +func (w *WAL) fsync(f *Segment) error { + start := time.Now() + err := fileutil.Fsync(f.File) + w.fsyncDuration.Observe(time.Since(start).Seconds()) + return err +} + +// Close flushes all writes and closes active segment. +func (w *WAL) Close() (err error) { + w.mtx.Lock() + defer w.mtx.Unlock() + + // Flush the last page and zero out all its remaining size. + // We must not flush an empty page as it would falsely signal + // the segment is done if we start writing to it again after opening. + if w.page.alloc > 0 { + if err := w.flushPage(true); err != nil { + return err + } + } + + donec := make(chan struct{}) + w.stopc <- donec + <-donec + + if err = w.fsync(w.segment); err != nil { + level.Error(w.logger).Log("msg", "sync previous segment", "err", err) + } + if err := w.segment.Close(); err != nil { + level.Error(w.logger).Log("msg", "close previous segment", "err", err) + } + + return nil +} + +type segmentRef struct { + s string + n int +} + +func listSegments(dir string) (refs []segmentRef, err error) { + files, err := fileutil.ReadDir(dir) + if err != nil { + return nil, err + } + var last int + for _, fn := range files { + k, err := strconv.Atoi(fn) + if err != nil { + continue + } + if len(refs) > 0 && k > last+1 { + return nil, errors.New("segments are not sequential") + } + refs = append(refs, segmentRef{s: fn, n: k}) + last = k + } + sort.Slice(refs, func(i, j int) bool { + return refs[i].n < refs[j].n + }) + return refs, nil +} + +// NewSegmentsReader returns a new reader over all segments in the directory. +func NewSegmentsReader(dir string) (io.ReadCloser, error) { + return NewSegmentsRangeReader(dir, 0, math.MaxInt64) +} + +// NewSegmentsRangeReader returns a new reader over the given WAL segment range. +// If m or n are -1, the range is open on the respective end. +func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { + refs, err := listSegments(dir) + if err != nil { + return nil, err + } + var segs []*Segment + + for _, r := range refs { + if m >= 0 && r.n < m { + continue + } + if n >= 0 && r.n > n { + break + } + s, err := OpenReadSegment(filepath.Join(dir, r.s)) + if err != nil { + return nil, err + } + segs = append(segs, s) + } + return newSegmentBufReader(segs...), nil +} + +// segmentBufReader is a buffered reader that reads in multiples of pages. +// The main purpose is that we are able to track segment and offset for +// corruption reporting. +type segmentBufReader struct { + buf *bufio.Reader + segs []*Segment + cur int + off int + more bool +} + +func newSegmentBufReader(segs ...*Segment) *segmentBufReader { + return &segmentBufReader{ + buf: bufio.NewReaderSize(nil, 16*pageSize), + segs: segs, + cur: -1, + } +} + +func (r *segmentBufReader) Close() (err error) { + for _, s := range r.segs { + if e := s.Close(); e != nil { + err = e + } + } + return err +} + +func (r *segmentBufReader) Read(b []byte) (n int, err error) { + if !r.more { + if r.cur+1 >= len(r.segs) { + return 0, io.EOF + } + r.cur++ + r.off = 0 + r.more = true + r.buf.Reset(r.segs[r.cur]) + } + n, err = r.buf.Read(b) + r.off += n + if err != io.EOF { + return n, err + } + // Just return what we read so far, but don't signal EOF. + // Only unset more so we don't invalidate the current segment and + // offset before the next read. + r.more = false + return n, nil +} + +// Reader reads WAL records from an io.Reader. +type Reader struct { + rdr io.Reader + err error + rec []byte + buf [pageSize]byte + total int // total bytes processed. +} + +// NewReader returns a new reader. +func NewReader(r io.Reader) *Reader { + return &Reader{rdr: r} +} + +// Next advances the reader to the next records and returns true if it exists. +// It must not be called again after it returned false. +func (r *Reader) Next() bool { + err := r.next() + if errors.Cause(err) == io.EOF { + return false + } + r.err = err + return r.err == nil +} + +func (r *Reader) next() (err error) { + // We have to use r.buf since allocating byte arrays here fails escape + // analysis and ends up on the heap, even though it seemingly should not. + hdr := r.buf[:recordHeaderSize] + buf := r.buf[recordHeaderSize:] + + r.rec = r.rec[:0] + + i := 0 + for { + if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil { + return errors.Wrap(err, "read first header byte") + } + r.total++ + typ := recType(hdr[0]) + + // Gobble up zero bytes. + if typ == recPageTerm { + // We are pedantic and check whether the zeros are actually up + // to a page boundary. + // It's not strictly necessary but may catch sketchy state early. + k := pageSize - (r.total % pageSize) + if k == pageSize { + continue // Initial 0 byte was last page byte. + } + n, err := io.ReadFull(r.rdr, buf[:k]) + if err != nil { + return errors.Wrap(err, "read remaining zeros") + } + r.total += n + + for _, c := range buf[:k] { + if c != 0 { + return errors.New("unexpected non-zero byte in padded page") + } + } + continue + } + n, err := io.ReadFull(r.rdr, hdr[1:]) + if err != nil { + return errors.Wrap(err, "read remaining header") + } + r.total += n + + var ( + length = binary.BigEndian.Uint16(hdr[1:]) + crc = binary.BigEndian.Uint32(hdr[3:]) + ) + + if length > pageSize-recordHeaderSize { + return errors.Errorf("invalid record size %d", length) + } + n, err = io.ReadFull(r.rdr, buf[:length]) + if err != nil { + return err + } + r.total += n + + if n != int(length) { + return errors.Errorf("invalid size: expected %d, got %d", length, n) + } + if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { + return errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + r.rec = append(r.rec, buf[:length]...) + + switch typ { + case recFull: + if i != 0 { + return errors.New("unexpected full record") + } + return nil + case recFirst: + if i != 0 { + return errors.New("unexpected first record") + } + case recMiddle: + if i == 0 { + return errors.New("unexpected middle record") + } + case recLast: + if i == 0 { + return errors.New("unexpected last record") + } + return nil + default: + return errors.Errorf("unexpected record type %d", typ) + } + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + i++ + } +} + +// Err returns the last encountered error wrapped in a corruption error. +// If the reader does not allow to infer a segment index and offset, a total +// offset in the reader stream will be provided. +func (r *Reader) Err() error { + if r.err == nil { + return nil + } + if b, ok := r.rdr.(*segmentBufReader); ok { + return &CorruptionErr{ + Err: r.err, + Segment: b.segs[b.cur].Index(), + Offset: b.off, + } + } + return &CorruptionErr{ + Err: r.err, + Segment: -1, + Offset: r.total, + } +} + +// Record returns the current record. The returned byte slice is only +// valid until the next call to Next. +func (r *Reader) Record() []byte { + return r.rec +} + +func min(i, j int) int { + if i < j { + return i + } + return j +} diff --git a/wal/wal_test.go b/wal/wal_test.go new file mode 100644 index 0000000000..d1b724c7ed --- /dev/null +++ b/wal/wal_test.go @@ -0,0 +1,361 @@ +// Copyright 2017 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bytes" + "encoding/binary" + "hash/crc32" + "io/ioutil" + "math/rand" + "os" + "testing" + + "github.com/prometheus/tsdb/testutil" +) + +func encodedRecord(t recType, b []byte) []byte { + if t == recPageTerm { + return append([]byte{0}, b...) + } + r := make([]byte, recordHeaderSize) + r[0] = byte(t) + binary.BigEndian.PutUint16(r[1:], uint16(len(b))) + binary.BigEndian.PutUint32(r[3:], crc32.Checksum(b, castagnoliTable)) + return append(r, b...) +} + +// TestReader feeds the reader a stream of encoded records with different types. +func TestReader(t *testing.T) { + data := make([]byte, 100000) + _, err := rand.Read(data) + testutil.Ok(t, err) + + type record struct { + t recType + b []byte + } + cases := []struct { + t []record + exp [][]byte + fail bool + }{ + // Sequence of valid records. + { + t: []record{ + {recFull, data[0:200]}, + {recFirst, data[200:300]}, + {recLast, data[300:400]}, + {recFirst, data[400:800]}, + {recMiddle, data[800:900]}, + {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. + {recLast, data[900:900]}, + {recFirst, data[900:1000]}, + {recMiddle, data[1000:1200]}, + {recMiddle, data[1200:30000]}, + {recMiddle, data[30000:30001]}, + {recMiddle, data[30001:30001]}, + {recLast, data[30001:32000]}, + }, + exp: [][]byte{ + data[0:200], + data[200:400], + data[400:900], + data[900:32000], + }, + }, + // Exactly at the limit of one page minus the header size + { + t: []record{ + {recFull, data[0 : pageSize-recordHeaderSize]}, + }, + exp: [][]byte{ + data[:pageSize-recordHeaderSize], + }, + }, + // More than a full page, this exceeds our buffer and can never happen + // when written by the WAL. + { + t: []record{ + {recFull, data[0 : pageSize+1]}, + }, + fail: true, + }, + // Invalid orders of record types. + { + t: []record{{recMiddle, data[:200]}}, + fail: true, + }, + { + t: []record{{recLast, data[:200]}}, + fail: true, + }, + { + t: []record{ + {recFirst, data[:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + { + t: []record{ + {recFirst, data[:100]}, + {recMiddle, data[100:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + // Non-zero data after page termination. + { + t: []record{ + {recFull, data[:100]}, + {recPageTerm, append(make([]byte, 1000), 1)}, + }, + exp: [][]byte{data[:100]}, + fail: true, + }, + } + for i, c := range cases { + t.Logf("test %d", i) + + var buf []byte + for _, r := range c.t { + buf = append(buf, encodedRecord(r.t, r.b)...) + } + r := NewReader(bytes.NewReader(buf)) + + for j := 0; r.Next(); j++ { + t.Logf("record %d", j) + rec := r.Record() + + if j >= len(c.exp) { + t.Fatal("received more records than inserted") + } + testutil.Equals(t, c.exp[j], rec) + } + if !c.fail && r.Err() != nil { + t.Fatalf("unexpected error: %s", r.Err()) + } + if c.fail && r.Err() == nil { + t.Fatalf("expected error but got none") + } + } +} + +func TestWAL_FuzzWriteRead(t *testing.T) { + const count = 25000 + + dir, err := ioutil.TempDir("", "walfuzz") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := NewSize(nil, nil, dir, 128*pageSize) + testutil.Ok(t, err) + + var input [][]byte + var recs [][]byte + + for i := 0; i < count; i++ { + var sz int + switch i % 5 { + case 0, 1: + sz = 50 + case 2, 3: + sz = pageSize + default: + sz = 8 * pageSize + } + rec := make([]byte, rand.Intn(sz)) + _, err := rand.Read(rec) + testutil.Ok(t, err) + + input = append(input, rec) + recs = append(recs, rec) + + // Randomly batch up records. + if rand.Intn(4) < 3 { + testutil.Ok(t, w.Log(recs...)) + recs = recs[:0] + } + } + testutil.Ok(t, w.Log(recs...)) + + m, n, err := w.Segments() + testutil.Ok(t, err) + + rc, err := NewSegmentsRangeReader(dir, m, n) + testutil.Ok(t, err) + defer rc.Close() + + rdr := NewReader(rc) + + for i := 0; rdr.Next(); i++ { + rec := rdr.Record() + if i >= len(input) { + t.Fatal("read too many records") + } + if !bytes.Equal(input[i], rec) { + t.Fatalf("record %d (len %d) does not match (expected len %d)", + i, len(rec), len(input[i])) + } + } + testutil.Ok(t, rdr.Err()) +} + +func TestWAL_Repair(t *testing.T) { + for name, cf := range map[string]func(f *os.File){ + "bad_fragment_sequence": func(f *os.File) { + _, err := f.Seek(pageSize, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{byte(recLast)}) + testutil.Ok(t, err) + }, + "bad_fragment_flag": func(f *os.File) { + _, err := f.Seek(pageSize, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{123}) + testutil.Ok(t, err) + }, + "bad_checksum": func(f *os.File) { + _, err := f.Seek(pageSize+4, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + }, + "bad_length": func(f *os.File) { + _, err := f.Seek(pageSize+2, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + }, + "bad_content": func(f *os.File) { + _, err := f.Seek(pageSize+100, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte("beef")) + testutil.Ok(t, err) + }, + } { + t.Run(name, func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_repair") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + // We create 3 segments with 3 records each and then corrupt the 2nd record + // of the 2nd segment. + // As a result we want a repaired WAL with the first 4 records intact. + w, err := NewSize(nil, nil, dir, 3*pageSize) + testutil.Ok(t, err) + + var records [][]byte + + for i := 1; i <= 9; i++ { + b := make([]byte, pageSize-recordHeaderSize) + b[0] = byte(i) + records = append(records, b) + testutil.Ok(t, w.Log(b)) + } + testutil.Ok(t, w.Close()) + + f, err := os.OpenFile(SegmentName(dir, 1), os.O_RDWR, 0666) + testutil.Ok(t, err) + + // Apply corruption function. + cf(f) + + testutil.Ok(t, f.Close()) + + w, err = New(nil, nil, dir) + testutil.Ok(t, err) + + sr, err := NewSegmentsReader(dir) + testutil.Ok(t, err) + r := NewReader(sr) + + for r.Next() { + } + testutil.NotOk(t, r.Err()) + + testutil.Ok(t, w.Repair(r.Err())) + + sr, err = NewSegmentsReader(dir) + testutil.Ok(t, err) + r = NewReader(sr) + + var result [][]byte + for r.Next() { + var b []byte + result = append(result, append(b, r.Record()...)) + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, 4, len(result)) + + for i, r := range result { + if !bytes.Equal(records[i], r) { + t.Fatalf("record %d diverges: want %x, got %x", i, records[i][:10], r[:10]) + } + } + }) + } +} + +func BenchmarkWAL_LogBatched(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_logbatch") + testutil.Ok(b, err) + defer os.RemoveAll(dir) + + w, err := New(nil, nil, "testdir") + testutil.Ok(b, err) + defer w.Close() + + var buf [2048]byte + var recs [][]byte + b.SetBytes(2048) + + for i := 0; i < b.N; i++ { + recs = append(recs, buf[:]) + if len(recs) < 1000 { + continue + } + err := w.Log(recs...) + testutil.Ok(b, err) + recs = recs[:0] + } + // Stop timer to not count fsync time on close. + // If it's counted batched vs. single benchmarks are very similar but + // do not show burst throughput well. + b.StopTimer() +} + +func BenchmarkWAL_Log(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_logsingle") + testutil.Ok(b, err) + defer os.RemoveAll(dir) + + w, err := New(nil, nil, "testdir") + testutil.Ok(b, err) + defer w.Close() + + var buf [2048]byte + b.SetBytes(2048) + + for i := 0; i < b.N; i++ { + err := w.Log(buf[:]) + testutil.Ok(b, err) + } + // Stop timer to not count fsync time on close. + // If it's counted batched vs. single benchmarks are very similar but + // do not show burst throughput well. + b.StopTimer() +} diff --git a/wal_test.go b/wal_test.go index 6d559f5d9d..b16680a994 100644 --- a/wal_test.go +++ b/wal_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "math/rand" "os" + "path" "testing" "time" @@ -26,6 +27,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func TestSegmentWAL_cut(t *testing.T) { @@ -431,3 +433,117 @@ func TestWALRestoreCorrupted(t *testing.T) { }) } } + +func TestMigrateWAL_Empty(t *testing.T) { + // The migration proecedure must properly deal with a zero-length segment, + // which is valid in the new format. + dir, err := ioutil.TempDir("", "walmigrate") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + + // Initialize empty WAL. + w, err := wal.New(nil, nil, wdir) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + testutil.Ok(t, MigrateWAL(nil, wdir)) +} + +func TestMigrateWAL_Fuzz(t *testing.T) { + dir, err := ioutil.TempDir("", "walmigrate") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + + // Should pass if no WAL exists yet. + testutil.Ok(t, MigrateWAL(nil, wdir)) + + oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil) + testutil.Ok(t, err) + + // Write some data. + testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, + {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, + })) + testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + {Ref: 1, T: 100, V: 200}, + {Ref: 2, T: 300, V: 400}, + })) + testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, + })) + testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + {Ref: 3, T: 100, V: 200}, + {Ref: 4, T: 300, V: 400}, + })) + testutil.Ok(t, oldWAL.LogDeletes([]Stone{ + {ref: 1, intervals: []Interval{{100, 200}}}, + })) + + testutil.Ok(t, oldWAL.Close()) + + // Perform migration. + testutil.Ok(t, MigrateWAL(nil, wdir)) + + w, err := wal.New(nil, nil, wdir) + testutil.Ok(t, err) + + // We can properly write some new data after migration. + var enc RecordEncoder + testutil.Ok(t, w.Log(enc.Samples([]RefSample{ + {Ref: 500, T: 1, V: 1}, + }, nil))) + + testutil.Ok(t, w.Close()) + + // Read back all data. + sr, err := wal.NewSegmentsReader(wdir) + testutil.Ok(t, err) + + r := wal.NewReader(sr) + var res []interface{} + var dec RecordDecoder + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + s, err := dec.Series(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + case RecordSamples: + s, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + case RecordTombstones: + s, err := dec.Tombstones(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + default: + t.Fatalf("unknown record type %d", dec.Type(rec)) + } + } + testutil.Ok(t, r.Err()) + + testutil.Equals(t, []interface{}{ + []RefSeries{ + {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, + {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, + }, + []RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}}, + []RefSeries{ + {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, + }, + []RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}}, + []Stone{{ref: 1, intervals: []Interval{{100, 200}}}}, + []RefSample{{Ref: 500, T: 1, V: 1}}, + }, res) + + // Migrating an already migrated WAL shouldn't do anything. + testutil.Ok(t, MigrateWAL(nil, wdir)) +}