diff --git a/CHANGELOG.md b/CHANGELOG.md index 610899d72..a5b0d09be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ## master / unreleased - + - [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609) ## 0.8.0 - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. diff --git a/checkpoint.go b/checkpoint.go index d8dee28aa..eccfa62be 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) if err := os.MkdirAll(cpdirtmp, 0777); err != nil { return nil, errors.Wrap(err, "create checkpoint dir") } - cp, err := wal.New(nil, nil, cpdirtmp) + cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled()) if err != nil { return nil, errors.Wrap(err, "open checkpoint") } diff --git a/checkpoint_test.go b/checkpoint_test.go index fe8ee4e93..0779894b0 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -86,108 +86,112 @@ func TestDeleteCheckpoints(t *testing.T) { } func TestCheckpoint(t *testing.T) { - dir, err := ioutil.TempDir("", "test_checkpoint") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - var enc RecordEncoder - // Create a dummy segment to bump the initial number. - seg, err := wal.CreateSegment(dir, 100) - testutil.Ok(t, err) - testutil.Ok(t, seg.Close()) - - // Manually create checkpoint for 99 and earlier. - w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099")) - testutil.Ok(t, err) - - // Add some data we expect to be around later. - err = w.Log(enc.Series([]RefSeries{ - {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, - {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, - }, nil)) - testutil.Ok(t, err) - testutil.Ok(t, w.Close()) - - // Start a WAL and write records to it as usual. - w, err = wal.NewSize(nil, nil, dir, 64*1024) - testutil.Ok(t, err) - - var last int64 - for i := 0; ; i++ { - _, n, err := w.Segments() - testutil.Ok(t, err) - if n >= 106 { - break - } - // Write some series initially. - if i == 0 { - b := enc.Series([]RefSeries{ - {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, - {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, - {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, - {Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")}, - }, nil) - testutil.Ok(t, w.Log(b)) - } - // Write samples until the WAL has enough segments. - // Make them have drifting timestamps within a record to see that they - // get filtered properly. - b := enc.Samples([]RefSample{ - {Ref: 0, T: last, V: float64(i)}, - {Ref: 1, T: last + 10000, V: float64(i)}, - {Ref: 2, T: last + 20000, V: float64(i)}, - {Ref: 3, T: last + 30000, V: float64(i)}, - }, nil) - testutil.Ok(t, w.Log(b)) - - last += 100 - } - testutil.Ok(t, w.Close()) - - _, err = Checkpoint(w, 100, 106, func(x uint64) bool { - return x%2 == 0 - }, last/2) - testutil.Ok(t, err) - testutil.Ok(t, w.Truncate(107)) - testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106)) - - // Only the new checkpoint should be left. - files, err := fileutil.ReadDir(dir) - testutil.Ok(t, err) - testutil.Equals(t, 1, len(files)) - testutil.Equals(t, "checkpoint.000106", files[0]) - - sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) - testutil.Ok(t, err) - defer sr.Close() - - var dec RecordDecoder - var series []RefSeries - r := wal.NewReader(sr) - - for r.Next() { - rec := r.Record() - - switch dec.Type(rec) { - case RecordSeries: - series, err = dec.Series(rec, series) + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") testutil.Ok(t, err) - case RecordSamples: - samples, err := dec.Samples(rec, nil) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + var enc RecordEncoder + // Create a dummy segment to bump the initial number. + seg, err := wal.CreateSegment(dir, 100) testutil.Ok(t, err) - for _, s := range samples { - testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp") + testutil.Ok(t, seg.Close()) + + // Manually create checkpoint for 99 and earlier. + w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress) + testutil.Ok(t, err) + + // Add some data we expect to be around later. + err = w.Log(enc.Series([]RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, + }, nil)) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + // Start a WAL and write records to it as usual. + w, err = wal.NewSize(nil, nil, dir, 64*1024, compress) + testutil.Ok(t, err) + + var last int64 + for i := 0; ; i++ { + _, n, err := w.Segments() + testutil.Ok(t, err) + if n >= 106 { + break + } + // Write some series initially. + if i == 0 { + b := enc.Series([]RefSeries{ + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + {Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")}, + }, nil) + testutil.Ok(t, w.Log(b)) + } + // Write samples until the WAL has enough segments. + // Make them have drifting timestamps within a record to see that they + // get filtered properly. + b := enc.Samples([]RefSample{ + {Ref: 0, T: last, V: float64(i)}, + {Ref: 1, T: last + 10000, V: float64(i)}, + {Ref: 2, T: last + 20000, V: float64(i)}, + {Ref: 3, T: last + 30000, V: float64(i)}, + }, nil) + testutil.Ok(t, w.Log(b)) + + last += 100 } - } + testutil.Ok(t, w.Close()) + + _, err = Checkpoint(w, 100, 106, func(x uint64) bool { + return x%2 == 0 + }, last/2) + testutil.Ok(t, err) + testutil.Ok(t, w.Truncate(107)) + testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106)) + + // Only the new checkpoint should be left. + files, err := fileutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, 1, len(files)) + testutil.Equals(t, "checkpoint.000106", files[0]) + + sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) + testutil.Ok(t, err) + defer sr.Close() + + var dec RecordDecoder + var series []RefSeries + r := wal.NewReader(sr) + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err = dec.Series(rec, series) + testutil.Ok(t, err) + case RecordSamples: + samples, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + for _, s := range samples { + testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp") + } + } + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, []RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + }, series) + }) } - testutil.Ok(t, r.Err()) - testutil.Equals(t, []RefSeries{ - {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, - {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, - {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, - }, series) } func TestCheckpointNoTmpFolderAfterError(t *testing.T) { @@ -197,7 +201,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.NewSize(nil, nil, dir, 64*1024) + w, err := wal.NewSize(nil, nil, dir, 64*1024, false) testutil.Ok(t, err) testutil.Ok(t, w.Log([]byte{99})) w.Close() diff --git a/db.go b/db.go index e80d3bd2d..127019b47 100644 --- a/db.go +++ b/db.go @@ -51,6 +51,7 @@ var DefaultOptions = &Options{ BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5), NoLockfile: false, AllowOverlappingBlocks: false, + WALCompression: false, } // Options of the DB storage. @@ -80,6 +81,9 @@ type Options struct { // Overlapping blocks are allowed if AllowOverlappingBlocks is true. // This in-turn enables vertical compaction and vertical query merge. AllowOverlappingBlocks bool + + // WALCompression will turn on Snappy compression for records on the WAL. + WALCompression bool } // Appender allows appending a batch of data. It must be completed with a @@ -306,7 +310,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if opts.WALSegmentSize > 0 { segmentSize = opts.WALSegmentSize } - wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize) + wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression) if err != nil { return nil, err } diff --git a/db_test.go b/db_test.go index 6014a1d00..20dcf6306 100644 --- a/db_test.go +++ b/db_test.go @@ -1404,7 +1404,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { }() testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) - w, err := wal.New(nil, nil, path.Join(dir, "wal")) + w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) testutil.Ok(t, err) var enc RecordEncoder @@ -1454,7 +1454,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 1000, 6000)) testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) - w, err := wal.New(nil, nil, path.Join(dir, "wal")) + w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) testutil.Ok(t, err) var enc RecordEncoder diff --git a/go.mod b/go.mod index 9e52983cb..ccdd43724 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ require ( github.com/cespare/xxhash v1.1.0 github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954 github.com/go-kit/kit v0.8.0 + github.com/golang/snappy v0.0.1 github.com/oklog/ulid v1.3.1 github.com/pkg/errors v0.8.0 github.com/prometheus/client_golang v1.0.0 diff --git a/go.sum b/go.sum index da7773f9b..365fa5ecf 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= diff --git a/head_test.go b/head_test.go index b10294f73..55067eae3 100644 --- a/head_test.go +++ b/head_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "fmt" "io/ioutil" "math" "math/rand" @@ -96,70 +97,73 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { } func TestHead_ReadWAL(t *testing.T) { - entries := []interface{}{ - []RefSeries{ - {Ref: 10, Labels: labels.FromStrings("a", "1")}, - {Ref: 11, Labels: labels.FromStrings("a", "2")}, - {Ref: 100, Labels: labels.FromStrings("a", "3")}, - }, - []RefSample{ - {Ref: 0, T: 99, V: 1}, - {Ref: 10, T: 100, V: 2}, - {Ref: 100, T: 100, V: 3}, - }, - []RefSeries{ - {Ref: 50, Labels: labels.FromStrings("a", "4")}, - // This series has two refs pointing to it. - {Ref: 101, Labels: labels.FromStrings("a", "3")}, - }, - []RefSample{ - {Ref: 10, T: 101, V: 5}, - {Ref: 50, T: 101, V: 6}, - {Ref: 101, T: 101, V: 7}, - }, - []Stone{ - {ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}}, - }, + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + entries := []interface{}{ + []RefSeries{ + {Ref: 10, Labels: labels.FromStrings("a", "1")}, + {Ref: 11, Labels: labels.FromStrings("a", "2")}, + {Ref: 100, Labels: labels.FromStrings("a", "3")}, + }, + []RefSample{ + {Ref: 0, T: 99, V: 1}, + {Ref: 10, T: 100, V: 2}, + {Ref: 100, T: 100, V: 3}, + }, + []RefSeries{ + {Ref: 50, Labels: labels.FromStrings("a", "4")}, + // This series has two refs pointing to it. + {Ref: 101, Labels: labels.FromStrings("a", "3")}, + }, + []RefSample{ + {Ref: 10, T: 101, V: 5}, + {Ref: 50, T: 101, V: 6}, + {Ref: 101, T: 101, V: 7}, + }, + []Stone{ + {ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}}, + }, + } + dir, err := ioutil.TempDir("", "test_read_wal") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + w, err := wal.New(nil, nil, dir, compress) + testutil.Ok(t, err) + defer w.Close() + populateTestWAL(t, w, entries) + + head, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) + + testutil.Ok(t, head.Init(math.MinInt64)) + testutil.Equals(t, uint64(101), head.lastSeriesID) + + s10 := head.series.getByID(10) + s11 := head.series.getByID(11) + s50 := head.series.getByID(50) + s100 := head.series.getByID(100) + + testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset) + testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init(). + testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset) + testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset) + + expandChunk := func(c chunkenc.Iterator) (x []sample) { + for c.Next() { + t, v := c.At() + x = append(x, sample{t: t, v: v}) + } + testutil.Ok(t, c.Err()) + return x + } + testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) + testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) + testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0))) + }) } - dir, err := ioutil.TempDir("", "test_read_wal") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - w, err := wal.New(nil, nil, dir) - testutil.Ok(t, err) - defer w.Close() - populateTestWAL(t, w, entries) - - head, err := NewHead(nil, nil, w, 1000) - testutil.Ok(t, err) - - testutil.Ok(t, head.Init(math.MinInt64)) - testutil.Equals(t, uint64(101), head.lastSeriesID) - - s10 := head.series.getByID(10) - s11 := head.series.getByID(11) - s50 := head.series.getByID(50) - s100 := head.series.getByID(100) - - testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset) - testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init(). - testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset) - testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset) - - expandChunk := func(c chunkenc.Iterator) (x []sample) { - for c.Next() { - t, v := c.At() - x = append(x, sample{t: t, v: v}) - } - testutil.Ok(t, c.Err()) - return x - } - - testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) - testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) - testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0))) } func TestHead_WALMultiRef(t *testing.T) { @@ -169,7 +173,7 @@ func TestHead_WALMultiRef(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.New(nil, nil, dir) + w, err := wal.New(nil, nil, dir, false) testutil.Ok(t, err) head, err := NewHead(nil, nil, w, 1000) @@ -193,7 +197,7 @@ func TestHead_WALMultiRef(t *testing.T) { } testutil.Ok(t, head.Close()) - w, err = wal.New(nil, nil, dir) + w, err = wal.New(nil, nil, dir, false) testutil.Ok(t, err) head, err = NewHead(nil, nil, w, 1000) @@ -319,36 +323,40 @@ func TestMemSeries_truncateChunks(t *testing.T) { } func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { - entries := []interface{}{ - []RefSeries{ - {Ref: 10, Labels: labels.FromStrings("a", "1")}, - }, - []RefSample{}, - []RefSeries{ - {Ref: 50, Labels: labels.FromStrings("a", "2")}, - }, - []RefSample{ - {Ref: 50, T: 80, V: 1}, - {Ref: 50, T: 90, V: 1}, - }, + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + entries := []interface{}{ + []RefSeries{ + {Ref: 10, Labels: labels.FromStrings("a", "1")}, + }, + []RefSample{}, + []RefSeries{ + {Ref: 50, Labels: labels.FromStrings("a", "2")}, + }, + []RefSample{ + {Ref: 50, T: 80, V: 1}, + {Ref: 50, T: 90, V: 1}, + }, + } + dir, err := ioutil.TempDir("", "test_delete_series") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + w, err := wal.New(nil, nil, dir, compress) + testutil.Ok(t, err) + defer w.Close() + populateTestWAL(t, w, entries) + + head, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) + + testutil.Ok(t, head.Init(math.MinInt64)) + + testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) + }) } - dir, err := ioutil.TempDir("", "test_delete_series") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - w, err := wal.New(nil, nil, dir) - testutil.Ok(t, err) - defer w.Close() - populateTestWAL(t, w, entries) - - head, err := NewHead(nil, nil, w, 1000) - testutil.Ok(t, err) - - testutil.Ok(t, head.Init(math.MinInt64)) - - testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) } func TestHeadDeleteSimple(t *testing.T) { @@ -388,129 +396,133 @@ func TestHeadDeleteSimple(t *testing.T) { }, } -Outer: - for _, c := range cases { - dir, err := ioutil.TempDir("", "test_wal_reload") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - w, err := wal.New(nil, nil, path.Join(dir, "wal")) - testutil.Ok(t, err) - defer w.Close() - - head, err := NewHead(nil, nil, w, 1000) - testutil.Ok(t, err) - defer head.Close() - - app := head.Appender() - for _, smpl := range smplsAll { - _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) - testutil.Ok(t, err) - - } - testutil.Ok(t, app.Commit()) - - // Delete the ranges. - for _, r := range c.dranges { - testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))) - } - - // Compare the samples for both heads - before and after the reload. - reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload. - testutil.Ok(t, err) - defer reloadedW.Close() - reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) - testutil.Ok(t, err) - defer reloadedHead.Close() - testutil.Ok(t, reloadedHead.Init(0)) - for _, h := range []*Head{head, reloadedHead} { - indexr, err := h.Index() - testutil.Ok(t, err) - // Use an emptyTombstoneReader explicitly to get all the samples. - css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) - testutil.Ok(t, err) - - // Getting the actual samples. - actSamples := make([]sample, 0) - for css.Next() { - lblsAct, chkMetas, intv := css.At() - testutil.Equals(t, labels.Labels{lblDefault}, lblsAct) - testutil.Equals(t, 0, len(intv)) - - chunkr, err := h.Chunks() + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + Outer: + for _, c := range cases { + dir, err := ioutil.TempDir("", "test_wal_reload") testutil.Ok(t, err) - for _, meta := range chkMetas { - chk, err := chunkr.Chunk(meta.Ref) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + w, err := wal.New(nil, nil, path.Join(dir, "wal"), compress) + testutil.Ok(t, err) + defer w.Close() + + head, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) + defer head.Close() + + app := head.Appender() + for _, smpl := range smplsAll { + _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) - ii := chk.Iterator() - for ii.Next() { - t, v := ii.At() - actSamples = append(actSamples, sample{t: t, v: v}) + + } + testutil.Ok(t, app.Commit()) + + // Delete the ranges. + for _, r := range c.dranges { + testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))) + } + + // Compare the samples for both heads - before and after the reload. + reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload. + testutil.Ok(t, err) + defer reloadedW.Close() + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) + testutil.Ok(t, err) + defer reloadedHead.Close() + testutil.Ok(t, reloadedHead.Init(0)) + for _, h := range []*Head{head, reloadedHead} { + indexr, err := h.Index() + testutil.Ok(t, err) + // Use an emptyTombstoneReader explicitly to get all the samples. + css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) + testutil.Ok(t, err) + + // Getting the actual samples. + actSamples := make([]sample, 0) + for css.Next() { + lblsAct, chkMetas, intv := css.At() + testutil.Equals(t, labels.Labels{lblDefault}, lblsAct) + testutil.Equals(t, 0, len(intv)) + + chunkr, err := h.Chunks() + testutil.Ok(t, err) + for _, meta := range chkMetas { + chk, err := chunkr.Chunk(meta.Ref) + testutil.Ok(t, err) + ii := chk.Iterator() + for ii.Next() { + t, v := ii.At() + actSamples = append(actSamples, sample{t: t, v: v}) + } + } + } + + testutil.Ok(t, css.Err()) + testutil.Equals(t, c.smplsExp, actSamples) + } + + // Compare the query results for both heads - before and after the reload. + expSeriesSet := newMockSeriesSet([]Series{ + newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample { + ss := make([]tsdbutil.Sample, 0, len(c.smplsExp)) + for _, s := range c.smplsExp { + ss = append(ss, s) + } + return ss + }(), + ), + }) + for _, h := range []*Head{head, reloadedHead} { + q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) + testutil.Ok(t, err) + actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) + testutil.Ok(t, err) + + lns, err := q.LabelNames() + testutil.Ok(t, err) + lvs, err := q.LabelValues(lblDefault.Name) + testutil.Ok(t, err) + // When all samples are deleted we expect that no labels should exist either. + if len(c.smplsExp) == 0 { + testutil.Equals(t, 0, len(lns)) + testutil.Equals(t, 0, len(lvs)) + testutil.Assert(t, actSeriesSet.Next() == false, "") + testutil.Ok(t, h.Close()) + continue + } else { + testutil.Equals(t, 1, len(lns)) + testutil.Equals(t, 1, len(lvs)) + testutil.Equals(t, lblDefault.Name, lns[0]) + testutil.Equals(t, lblDefault.Value, lvs[0]) + } + + for { + eok, rok := expSeriesSet.Next(), actSeriesSet.Next() + testutil.Equals(t, eok, rok) + + if !eok { + testutil.Ok(t, h.Close()) + continue Outer + } + expSeries := expSeriesSet.At() + actSeries := actSeriesSet.At() + + testutil.Equals(t, expSeries.Labels(), actSeries.Labels()) + + smplExp, errExp := expandSeriesIterator(expSeries.Iterator()) + smplRes, errRes := expandSeriesIterator(actSeries.Iterator()) + + testutil.Equals(t, errExp, errRes) + testutil.Equals(t, smplExp, smplRes) } } } - - testutil.Ok(t, css.Err()) - testutil.Equals(t, c.smplsExp, actSamples) - } - - // Compare the query results for both heads - before and after the reload. - expSeriesSet := newMockSeriesSet([]Series{ - newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample { - ss := make([]tsdbutil.Sample, 0, len(c.smplsExp)) - for _, s := range c.smplsExp { - ss = append(ss, s) - } - return ss - }(), - ), }) - for _, h := range []*Head{head, reloadedHead} { - q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) - testutil.Ok(t, err) - actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) - testutil.Ok(t, err) - - lns, err := q.LabelNames() - testutil.Ok(t, err) - lvs, err := q.LabelValues(lblDefault.Name) - testutil.Ok(t, err) - // When all samples are deleted we expect that no labels should exist either. - if len(c.smplsExp) == 0 { - testutil.Equals(t, 0, len(lns)) - testutil.Equals(t, 0, len(lvs)) - testutil.Assert(t, actSeriesSet.Next() == false, "") - testutil.Ok(t, h.Close()) - continue - } else { - testutil.Equals(t, 1, len(lns)) - testutil.Equals(t, 1, len(lvs)) - testutil.Equals(t, lblDefault.Name, lns[0]) - testutil.Equals(t, lblDefault.Value, lvs[0]) - } - - for { - eok, rok := expSeriesSet.Next(), actSeriesSet.Next() - testutil.Equals(t, eok, rok) - - if !eok { - testutil.Ok(t, h.Close()) - continue Outer - } - expSeries := expSeriesSet.At() - actSeries := actSeriesSet.At() - - testutil.Equals(t, expSeries.Labels(), actSeries.Labels()) - - smplExp, errExp := expandSeriesIterator(expSeries.Iterator()) - smplRes, errRes := expandSeriesIterator(actSeries.Iterator()) - - testutil.Equals(t, errExp, errRes) - testutil.Equals(t, smplExp, smplRes) - } - } } } @@ -559,7 +571,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - wlog, err := wal.NewSize(nil, nil, dir, 32768) + wlog, err := wal.NewSize(nil, nil, dir, 32768, false) testutil.Ok(t, err) // Enough samples to cause a checkpoint. @@ -1019,30 +1031,34 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { } func TestHead_LogRollback(t *testing.T) { - dir, err := ioutil.TempDir("", "wal_rollback") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_rollback") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() - w, err := wal.New(nil, nil, dir) - testutil.Ok(t, err) - defer w.Close() - h, err := NewHead(nil, nil, w, 1000) - testutil.Ok(t, err) + w, err := wal.New(nil, nil, dir, compress) + testutil.Ok(t, err) + defer w.Close() + h, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) - app := h.Appender() - _, err = app.Add(labels.FromStrings("a", "b"), 1, 2) - testutil.Ok(t, err) + app := h.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), 1, 2) + testutil.Ok(t, err) - testutil.Ok(t, app.Rollback()) - recs := readTestWAL(t, w.Dir()) + testutil.Ok(t, app.Rollback()) + recs := readTestWAL(t, w.Dir()) - testutil.Equals(t, 1, len(recs)) + testutil.Equals(t, 1, len(recs)) - series, ok := recs[0].([]RefSeries) - testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) - testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) + series, ok := recs[0].([]RefSeries) + testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) + testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) + }) + } } // TestWalRepair_DecodingError ensures that a repair is run for an error @@ -1057,8 +1073,11 @@ func TestWalRepair_DecodingError(t *testing.T) { }{ "invalid_record": { func(rec []byte) []byte { - rec[0] = byte(RecordInvalid) - return rec + // Do not modify the base record because it is Logged multiple times. + res := make([]byte, len(rec)) + copy(res, rec) + res[0] = byte(RecordInvalid) + return res }, enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), 9, @@ -1089,65 +1108,66 @@ func TestWalRepair_DecodingError(t *testing.T) { 5, }, } { - t.Run(name, func(t *testing.T) { - dir, err := ioutil.TempDir("", "wal_repair") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - // Fill the wal and corrupt it. - { - w, err := wal.New(nil, nil, filepath.Join(dir, "wal")) - testutil.Ok(t, err) - - for i := 1; i <= test.totalRecs; i++ { - // At this point insert a corrupted record. - if i-1 == test.expRecs { - testutil.Ok(t, w.Log(test.corrFunc(test.rec))) - continue - } - testutil.Ok(t, w.Log(test.rec)) - } - - h, err := NewHead(nil, nil, w, 1) - testutil.Ok(t, err) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) - initErr := h.Init(math.MinInt64) - - err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. - _, corrErr := err.(*wal.CorruptionErr) - testutil.Assert(t, corrErr, "reading the wal didn't return corruption error") - testutil.Ok(t, w.Close()) - } - - // Open the db to trigger a repair. - { - db, err := Open(dir, nil, nil, DefaultOptions) + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_repair") testutil.Ok(t, err) defer func() { - testutil.Ok(t, db.Close()) + testutil.Ok(t, os.RemoveAll(dir)) }() - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal)) - } - // Read the wal content after the repair. - { - sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal")) - testutil.Ok(t, err) - defer sr.Close() - r := wal.NewReader(sr) + // Fill the wal and corrupt it. + { + w, err := wal.New(nil, nil, filepath.Join(dir, "wal"), compress) + testutil.Ok(t, err) - var actRec int - for r.Next() { - actRec++ + for i := 1; i <= test.totalRecs; i++ { + // At this point insert a corrupted record. + if i-1 == test.expRecs { + testutil.Ok(t, w.Log(test.corrFunc(test.rec))) + continue + } + testutil.Ok(t, w.Log(test.rec)) + } + + h, err := NewHead(nil, nil, w, 1) + testutil.Ok(t, err) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) + initErr := h.Init(math.MinInt64) + + err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. + _, corrErr := err.(*wal.CorruptionErr) + testutil.Assert(t, corrErr, "reading the wal didn't return corruption error") + testutil.Ok(t, w.Close()) } - testutil.Ok(t, r.Err()) - testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") - } - }) - } + // Open the db to trigger a repair. + { + db, err := Open(dir, nil, nil, DefaultOptions) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, db.Close()) + }() + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal)) + } + + // Read the wal content after the repair. + { + sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal")) + testutil.Ok(t, err) + defer sr.Close() + r := wal.NewReader(sr) + + var actRec int + for r.Next() { + actRec++ + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") + } + }) + } + } } func TestNewWalSegmentOnTruncate(t *testing.T) { @@ -1156,7 +1176,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - wlog, err := wal.NewSize(nil, nil, dir, 32768) + wlog, err := wal.NewSize(nil, nil, dir, 32768, false) testutil.Ok(t, err) h, err := NewHead(nil, nil, wlog, 1000) diff --git a/testutil/directory.go b/testutil/directory.go index d3c9c926f..e74b342b0 100644 --- a/testutil/directory.go +++ b/testutil/directory.go @@ -16,6 +16,7 @@ package testutil import ( "io/ioutil" "os" + "path/filepath" ) const ( @@ -127,3 +128,18 @@ func NewTemporaryDirectory(name string, t T) (handler TemporaryDirectory) { return } + +// DirSize returns the size in bytes of all files in a directory. +func DirSize(path string) (int64, error) { + var size int64 + err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + size += info.Size() + } + return nil + }) + return size, err +} diff --git a/wal.go b/wal.go index 81e1e2531..49f55fe40 100644 --- a/wal.go +++ b/wal.go @@ -1246,7 +1246,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err := os.RemoveAll(tmpdir); err != nil { return errors.Wrap(err, "cleanup replacement dir") } - repl, err := wal.New(logger, nil, tmpdir) + repl, err := wal.New(logger, nil, tmpdir, false) if err != nil { return errors.Wrap(err, "open new WAL") } diff --git a/wal/live_reader.go b/wal/live_reader.go index b62845fae..fb0485230 100644 --- a/wal/live_reader.go +++ b/wal/live_reader.go @@ -22,6 +22,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -57,6 +58,7 @@ type LiveReader struct { rdr io.Reader err error rec []byte + snappyBuf []byte hdr [recordHeaderSize]byte buf [pageSize]byte readIndex int // Index in buf to start at for next read. @@ -171,11 +173,18 @@ func (r *LiveReader) buildRecord() (bool, error) { return false, nil } - rt := recType(r.hdr[0]) + rt := recTypeFromHeader(r.hdr[0]) if rt == recFirst || rt == recFull { r.rec = r.rec[:0] + r.snappyBuf = r.snappyBuf[:0] + } + + compressed := r.hdr[0]&snappyMask != 0 + if compressed { + r.snappyBuf = append(r.snappyBuf, temp...) + } else { + r.rec = append(r.rec, temp...) } - r.rec = append(r.rec, temp...) if err := validateRecord(rt, r.index); err != nil { r.index = 0 @@ -183,6 +192,16 @@ func (r *LiveReader) buildRecord() (bool, error) { } if rt == recLast || rt == recFull { r.index = 0 + if compressed && len(r.snappyBuf) > 0 { + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + r.rec = r.rec[:cap(r.rec)] + r.rec, err = snappy.Decode(r.rec, r.snappyBuf) + if err != nil { + return false, err + } + } return true, nil } // Only increment i for non-zero records since we use it diff --git a/wal/reader.go b/wal/reader.go index 297463b00..7612f8775 100644 --- a/wal/reader.go +++ b/wal/reader.go @@ -19,6 +19,7 @@ import ( "hash/crc32" "io" + "github.com/golang/snappy" "github.com/pkg/errors" ) @@ -27,6 +28,7 @@ type Reader struct { rdr io.Reader err error rec []byte + snappyBuf []byte buf [pageSize]byte total int64 // Total bytes processed. curRecTyp recType // Used for checking that the last record is not torn. @@ -45,7 +47,7 @@ func (r *Reader) Next() bool { // The last WAL segment record shouldn't be torn(should be full or last). // The last record would be torn after a crash just before // the last record part could be persisted to disk. - if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle { + if r.curRecTyp == recFirst || r.curRecTyp == recMiddle { r.err = errors.New("last record is torn") } return false @@ -61,6 +63,7 @@ func (r *Reader) next() (err error) { buf := r.buf[recordHeaderSize:] r.rec = r.rec[:0] + r.snappyBuf = r.snappyBuf[:0] i := 0 for { @@ -68,7 +71,8 @@ func (r *Reader) next() (err error) { return errors.Wrap(err, "read first header byte") } r.total++ - r.curRecTyp = recType(hdr[0]) + r.curRecTyp = recTypeFromHeader(hdr[0]) + compressed := hdr[0]&snappyMask != 0 // Gobble up zero bytes. if r.curRecTyp == recPageTerm { @@ -123,12 +127,25 @@ func (r *Reader) next() (err error) { if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { return errors.Errorf("unexpected checksum %x, expected %x", c, crc) } - r.rec = append(r.rec, buf[:length]...) + + if compressed { + r.snappyBuf = append(r.snappyBuf, buf[:length]...) + } else { + r.rec = append(r.rec, buf[:length]...) + } if err := validateRecord(r.curRecTyp, i); err != nil { return err } if r.curRecTyp == recLast || r.curRecTyp == recFull { + if compressed && len(r.snappyBuf) > 0 { + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + r.rec = r.rec[:cap(r.rec)] + r.rec, err = snappy.Decode(r.rec, r.snappyBuf) + return err + } return nil } diff --git a/wal/reader_test.go b/wal/reader_test.go index 1a21d7bdb..1e15cae84 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -310,118 +310,124 @@ func allSegments(dir string) (io.ReadCloser, error) { func TestReaderFuzz(t *testing.T) { for name, fn := range readerConstructors { - t.Run(name, func(t *testing.T) { + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_fuzz_live") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + w, err := NewSize(nil, nil, dir, 128*pageSize, compress) + testutil.Ok(t, err) + + // Buffering required as we're not reading concurrently. + input := make(chan []byte, fuzzLen) + err = generateRandomEntries(w, input) + testutil.Ok(t, err) + close(input) + + err = w.Close() + testutil.Ok(t, err) + + sr, err := allSegments(w.Dir()) + testutil.Ok(t, err) + defer sr.Close() + + reader := fn(sr) + for expected := range input { + testutil.Assert(t, reader.Next(), "expected record: %v", reader.Err()) + testutil.Equals(t, expected, reader.Record(), "read wrong record") + } + testutil.Assert(t, !reader.Next(), "unexpected record") + }) + } + } +} + +func TestReaderFuzz_Live(t *testing.T) { + logger := testutil.NewLogger(t) + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { dir, err := ioutil.TempDir("", "wal_fuzz_live") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := NewSize(nil, nil, dir, 128*pageSize) + w, err := NewSize(nil, nil, dir, 128*pageSize, compress) + testutil.Ok(t, err) + defer w.Close() + + // In the background, generate a stream of random records and write them + // to the WAL. + input := make(chan []byte, fuzzLen/10) // buffering required as we sometimes batch WAL writes. + done := make(chan struct{}) + go func() { + err := generateRandomEntries(w, input) + testutil.Ok(t, err) + time.Sleep(100 * time.Millisecond) + close(done) + }() + + // Tail the WAL and compare the results. + m, _, err := w.Segments() testutil.Ok(t, err) - // Buffering required as we're not reading concurrently. - input := make(chan []byte, fuzzLen) - err = generateRandomEntries(w, input) - testutil.Ok(t, err) - close(input) - - err = w.Close() - testutil.Ok(t, err) - - sr, err := allSegments(w.Dir()) - testutil.Ok(t, err) - defer sr.Close() - - reader := fn(sr) - for expected := range input { - testutil.Assert(t, reader.Next(), "expected record: %v", reader.Err()) - testutil.Equals(t, expected, reader.Record(), "read wrong record") - } - testutil.Assert(t, !reader.Next(), "unexpected record") - }) - } -} - -func TestReaderFuzz_Live(t *testing.T) { - logger := testutil.NewLogger(t) - dir, err := ioutil.TempDir("", "wal_fuzz_live") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - w, err := NewSize(nil, nil, dir, 128*pageSize) - testutil.Ok(t, err) - defer w.Close() - - // In the background, generate a stream of random records and write them - // to the WAL. - input := make(chan []byte, fuzzLen/10) // buffering required as we sometimes batch WAL writes. - done := make(chan struct{}) - go func() { - err := generateRandomEntries(w, input) - testutil.Ok(t, err) - time.Sleep(100 * time.Millisecond) - close(done) - }() - - // Tail the WAL and compare the results. - m, _, err := w.Segments() - testutil.Ok(t, err) - - seg, err := OpenReadSegment(SegmentName(dir, m)) - testutil.Ok(t, err) - defer seg.Close() - - r := NewLiveReader(logger, nil, seg) - segmentTicker := time.NewTicker(100 * time.Millisecond) - readTicker := time.NewTicker(10 * time.Millisecond) - - readSegment := func(r *LiveReader) bool { - for r.Next() { - rec := r.Record() - expected, ok := <-input - testutil.Assert(t, ok, "unexpected record") - testutil.Equals(t, expected, rec, "record does not match expected") - } - testutil.Assert(t, r.Err() == io.EOF, "expected EOF, got: %v", r.Err()) - return true - } - -outer: - for { - select { - case <-segmentTicker.C: - // check if new segments exist - _, last, err := w.Segments() - testutil.Ok(t, err) - if last <= seg.i { - continue - } - - // read to end of segment. - readSegment(r) - - fi, err := os.Stat(SegmentName(dir, seg.i)) - testutil.Ok(t, err) - testutil.Assert(t, r.Offset() == fi.Size(), "expected to have read whole segment, but read %d of %d", r.Offset(), fi.Size()) - - seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) + seg, err := OpenReadSegment(SegmentName(dir, m)) testutil.Ok(t, err) defer seg.Close() - r = NewLiveReader(logger, nil, seg) - case <-readTicker.C: - readSegment(r) + r := NewLiveReader(logger, nil, seg) + segmentTicker := time.NewTicker(100 * time.Millisecond) + readTicker := time.NewTicker(10 * time.Millisecond) - case <-done: - readSegment(r) - break outer - } + readSegment := func(r *LiveReader) bool { + for r.Next() { + rec := r.Record() + expected, ok := <-input + testutil.Assert(t, ok, "unexpected record") + testutil.Equals(t, expected, rec, "record does not match expected") + } + testutil.Assert(t, r.Err() == io.EOF, "expected EOF, got: %v", r.Err()) + return true + } + + outer: + for { + select { + case <-segmentTicker.C: + // check if new segments exist + _, last, err := w.Segments() + testutil.Ok(t, err) + if last <= seg.i { + continue + } + + // read to end of segment. + readSegment(r) + + fi, err := os.Stat(SegmentName(dir, seg.i)) + testutil.Ok(t, err) + testutil.Assert(t, r.Offset() == fi.Size(), "expected to have read whole segment, but read %d of %d", r.Offset(), fi.Size()) + + seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) + testutil.Ok(t, err) + defer seg.Close() + r = NewLiveReader(logger, nil, seg) + + case <-readTicker.C: + readSegment(r) + + case <-done: + readSegment(r) + break outer + } + } + + testutil.Assert(t, r.Err() == io.EOF, "expected EOF") + }) } - - testutil.Assert(t, r.Err() == io.EOF, "expected EOF") } func TestLiveReaderCorrupt_ShortFile(t *testing.T) { @@ -434,7 +440,7 @@ func TestLiveReaderCorrupt_ShortFile(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := NewSize(nil, nil, dir, pageSize) + w, err := NewSize(nil, nil, dir, pageSize, false) testutil.Ok(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -478,7 +484,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := NewSize(nil, nil, dir, pageSize*2) + w, err := NewSize(nil, nil, dir, pageSize*2, false) testutil.Ok(t, err) rec := make([]byte, pageSize-recordHeaderSize) @@ -525,7 +531,7 @@ func TestReaderData(t *testing.T) { for name, fn := range readerConstructors { t.Run(name, func(t *testing.T) { - w, err := New(nil, nil, dir) + w, err := New(nil, nil, dir, true) testutil.Ok(t, err) sr, err := allSegments(dir) diff --git a/wal/wal.go b/wal/wal.go index e9ad327ef..39daba975 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -29,6 +29,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" @@ -165,6 +166,8 @@ type WAL struct { stopc chan chan struct{} actorc chan func() closed bool // To allow calling Close() more than once without blocking. + compress bool + snappyBuf []byte fsyncDuration prometheus.Summary pageFlushes prometheus.Counter @@ -175,13 +178,13 @@ type WAL struct { } // New returns a new WAL over the given directory. -func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { - return NewSize(logger, reg, dir, DefaultSegmentSize) +func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error) { + return NewSize(logger, reg, dir, DefaultSegmentSize, compress) } // NewSize returns a new WAL over the given directory. // New segments are created with the specified size. -func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { +func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WAL, error) { if segmentSize%pageSize != 0 { return nil, errors.New("invalid segment size") } @@ -198,6 +201,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi page: &page{}, actorc: make(chan func(), 100), stopc: make(chan chan struct{}), + compress: compress, } w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "prometheus_tsdb_wal_fsync_duration_seconds", @@ -253,6 +257,11 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi return w, nil } +// CompressionEnabled returns if compression is enabled on this WAL. +func (w *WAL) CompressionEnabled() bool { + return w.compress +} + // Dir returns the directory of the WAL. func (w *WAL) Dir() string { return w.dir @@ -476,6 +485,14 @@ func (w *WAL) flushPage(clear bool) error { return nil } +// First Byte of header format: +// [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ] + +const ( + snappyMask = 1 << 3 + recTypeMask = snappyMask - 1 +) + type recType uint8 const ( @@ -486,6 +503,10 @@ const ( recLast recType = 4 // Final fragment of a record. ) +func recTypeFromHeader(header byte) recType { + return recType(header & recTypeMask) +} + func (t recType) String() string { switch t { case recPageTerm: @@ -546,6 +567,19 @@ func (w *WAL) log(rec []byte, final bool) error { } } + compressed := false + if w.compress && len(rec) > 0 { + // The snappy library uses `len` to calculate if we need a new buffer. + // In order to allocate as few buffers as possible make the length + // equal to the capacity. + w.snappyBuf = w.snappyBuf[:cap(w.snappyBuf)] + w.snappyBuf = snappy.Encode(w.snappyBuf, rec) + if len(w.snappyBuf) < len(rec) { + rec = w.snappyBuf + compressed = true + } + } + // Populate as many pages as necessary to fit the record. // Be careful to always do one pass to ensure we write zero-length records. for i := 0; i == 0 || len(rec) > 0; i++ { @@ -569,6 +603,9 @@ func (w *WAL) log(rec []byte, final bool) error { default: typ = recMiddle } + if compressed { + typ |= snappyMask + } buf[0] = byte(typ) crc := crc32.Checksum(part, castagnoliTable) diff --git a/wal/wal_test.go b/wal/wal_test.go index 50761e07a..d2a6ccc25 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -120,7 +120,7 @@ func TestWALRepair_ReadingError(t *testing.T) { // then corrupt a given record in a given segment. // As a result we want a repaired WAL with given intact records. segSize := 3 * pageSize - w, err := NewSize(nil, nil, dir, segSize) + w, err := NewSize(nil, nil, dir, segSize, false) testutil.Ok(t, err) var records [][]byte @@ -145,7 +145,7 @@ func TestWALRepair_ReadingError(t *testing.T) { testutil.Ok(t, f.Close()) - w, err = NewSize(nil, nil, dir, segSize) + w, err = NewSize(nil, nil, dir, segSize, false) testutil.Ok(t, err) defer w.Close() @@ -223,7 +223,7 @@ func TestCorruptAndCarryOn(t *testing.T) { // Produce a WAL with a two segments of 3 pages with 3 records each, // so when we truncate the file we're guaranteed to split a record. { - w, err := NewSize(logger, nil, dir, segmentSize) + w, err := NewSize(logger, nil, dir, segmentSize, false) testutil.Ok(t, err) for i := 0; i < 18; i++ { @@ -294,7 +294,7 @@ func TestCorruptAndCarryOn(t *testing.T) { err = sr.Close() testutil.Ok(t, err) - w, err := NewSize(logger, nil, dir, segmentSize) + w, err := NewSize(logger, nil, dir, segmentSize, false) testutil.Ok(t, err) err = w.Repair(corruptionErr) @@ -341,7 +341,7 @@ func TestClose(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := NewSize(nil, nil, dir, pageSize) + w, err := NewSize(nil, nil, dir, pageSize, false) testutil.Ok(t, err) testutil.Ok(t, w.Close()) testutil.NotOk(t, w.Close()) @@ -358,7 +358,7 @@ func TestSegmentMetric(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := NewSize(nil, nil, dir, segmentSize) + w, err := NewSize(nil, nil, dir, segmentSize, false) testutil.Ok(t, err) initialSegment := client_testutil.ToFloat64(w.currentSegment) @@ -376,56 +376,104 @@ func TestSegmentMetric(t *testing.T) { testutil.Ok(t, w.Close()) } -func BenchmarkWAL_LogBatched(b *testing.B) { - dir, err := ioutil.TempDir("", "bench_logbatch") - testutil.Ok(b, err) +func TestCompression(t *testing.T) { + boostrap := func(compressed bool) string { + const ( + segmentSize = pageSize + recordSize = (pageSize / 2) - recordHeaderSize + records = 100 + ) + + dirPath, err := ioutil.TempDir("", fmt.Sprintf("TestCompression_%t", compressed)) + testutil.Ok(t, err) + + w, err := NewSize(nil, nil, dirPath, segmentSize, compressed) + testutil.Ok(t, err) + + buf := make([]byte, recordSize) + for i := 0; i < records; i++ { + testutil.Ok(t, w.Log(buf)) + } + testutil.Ok(t, w.Close()) + + return dirPath + } + + dirCompressed := boostrap(true) defer func() { - testutil.Ok(b, os.RemoveAll(dir)) + testutil.Ok(t, os.RemoveAll(dirCompressed)) + }() + dirUnCompressed := boostrap(false) + defer func() { + testutil.Ok(t, os.RemoveAll(dirUnCompressed)) }() - w, err := New(nil, nil, "testdir") - testutil.Ok(b, err) - defer w.Close() + uncompressedSize, err := testutil.DirSize(dirUnCompressed) + testutil.Ok(t, err) + compressedSize, err := testutil.DirSize(dirCompressed) + testutil.Ok(t, err) - var buf [2048]byte - var recs [][]byte - b.SetBytes(2048) + testutil.Assert(t, float64(uncompressedSize)*0.75 > float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize) +} - for i := 0; i < b.N; i++ { - recs = append(recs, buf[:]) - if len(recs) < 1000 { - continue - } - err := w.Log(recs...) - testutil.Ok(b, err) - recs = recs[:0] +func BenchmarkWAL_LogBatched(b *testing.B) { + for _, compress := range []bool{true, false} { + b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_logbatch") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() + + w, err := New(nil, nil, dir, compress) + testutil.Ok(b, err) + defer w.Close() + + var buf [2048]byte + var recs [][]byte + b.SetBytes(2048) + + for i := 0; i < b.N; i++ { + recs = append(recs, buf[:]) + if len(recs) < 1000 { + continue + } + err := w.Log(recs...) + testutil.Ok(b, err) + recs = recs[:0] + } + // Stop timer to not count fsync time on close. + // If it's counted batched vs. single benchmarks are very similar but + // do not show burst throughput well. + b.StopTimer() + }) } - // Stop timer to not count fsync time on close. - // If it's counted batched vs. single benchmarks are very similar but - // do not show burst throughput well. - b.StopTimer() } func BenchmarkWAL_Log(b *testing.B) { - dir, err := ioutil.TempDir("", "bench_logsingle") - testutil.Ok(b, err) - defer func() { - testutil.Ok(b, os.RemoveAll(dir)) - }() + for _, compress := range []bool{true, false} { + b.Run(fmt.Sprintf("compress=%t", compress), func(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_logsingle") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() - w, err := New(nil, nil, "testdir") - testutil.Ok(b, err) - defer w.Close() + w, err := New(nil, nil, dir, compress) + testutil.Ok(b, err) + defer w.Close() - var buf [2048]byte - b.SetBytes(2048) + var buf [2048]byte + b.SetBytes(2048) - for i := 0; i < b.N; i++ { - err := w.Log(buf[:]) - testutil.Ok(b, err) + for i := 0; i < b.N; i++ { + err := w.Log(buf[:]) + testutil.Ok(b, err) + } + // Stop timer to not count fsync time on close. + // If it's counted batched vs. single benchmarks are very similar but + // do not show burst throughput well. + b.StopTimer() + }) } - // Stop timer to not count fsync time on close. - // If it's counted batched vs. single benchmarks are very similar but - // do not show burst throughput well. - b.StopTimer() } diff --git a/wal_test.go b/wal_test.go index 7f07a63b9..0fed5b415 100644 --- a/wal_test.go +++ b/wal_test.go @@ -459,7 +459,7 @@ func TestMigrateWAL_Empty(t *testing.T) { wdir := path.Join(dir, "wal") // Initialize empty WAL. - w, err := wal.New(nil, nil, wdir) + w, err := wal.New(nil, nil, wdir, false) testutil.Ok(t, err) testutil.Ok(t, w.Close()) @@ -506,7 +506,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) { // Perform migration. testutil.Ok(t, MigrateWAL(nil, wdir)) - w, err := wal.New(nil, nil, wdir) + w, err := wal.New(nil, nil, wdir, false) testutil.Ok(t, err) // We can properly write some new data after migration.