From c2b88992a39d4f3c342b3b217b80d20616a48934 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 9 Apr 2019 02:52:44 -0700 Subject: [PATCH] Remote Write: fix checkpoint reading (#5429) * Fix ReadCheckpoint to ensure that it actually reads all the contents of each segment in a checkpoint dir, or returns an error. Signed-off-by: Callum Styan --- storage/remote/wal_watcher.go | 65 +++++++++------------- storage/remote/wal_watcher_test.go | 89 ++++++++++++++++++++++++++---- 2 files changed, 104 insertions(+), 50 deletions(-) diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 17c2a6573..4c5826415 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -216,7 +216,7 @@ func (w *WALWatcher) run() error { // findSegmentForIndex finds the first segment greater than or equal to index. func (w *WALWatcher) findSegmentForIndex(index int) (int, error) { - refs, err := w.segments() + refs, err := w.segments(w.walDir) if err != nil { return -1, nil } @@ -231,7 +231,7 @@ func (w *WALWatcher) findSegmentForIndex(index int) (int, error) { } func (w *WALWatcher) firstAndLast() (int, int, error) { - refs, err := w.segments() + refs, err := w.segments(w.walDir) if err != nil { return -1, -1, nil } @@ -244,8 +244,8 @@ func (w *WALWatcher) firstAndLast() (int, int, error) { // Copied from tsdb/wal/wal.go so we do not have to open a WAL. // Plan is to move WAL watcher to TSDB and dedupe these implementations. -func (w *WALWatcher) segments() ([]int, error) { - files, err := fileutil.ReadDir(w.walDir) +func (w *WALWatcher) segments(dir string) ([]int, error) { + files, err := fileutil.ReadDir(dir) if err != nil { return nil, err } @@ -476,27 +476,34 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error { return errors.Wrap(err, "checkpointNum") } - sr, err := wal.NewSegmentsReader(checkpointDir) + // Ensure we read the whole contents of every segment in the checkpoint dir. + segs, err := w.segments(checkpointDir) if err != nil { - return errors.Wrap(err, "NewSegmentsReader") + return errors.Wrap(err, "Unable to get segments checkpoint dir") } - defer sr.Close() + for _, seg := range segs { + size, err := getSegmentSize(checkpointDir, seg) + if err != nil { + return errors.Wrap(err, "getSegmentSize") + } - size, err := getCheckpointSize(checkpointDir) - if err != nil { - return errors.Wrap(err, "getCheckpointSize") + sr, err := wal.OpenReadSegment(wal.SegmentName(checkpointDir, seg)) + if err != nil { + return errors.Wrap(err, "unable to open segment") + } + defer sr.Close() + + r := wal.NewLiveReader(w.logger, sr) + if err := w.readSegment(r, index, false); err != io.EOF && err != nil { + return errors.Wrap(err, "readSegment") + } + + if r.Offset() != size { + return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, seg, size, r.Offset()) + } } - r := wal.NewLiveReader(w.logger, sr) - if err := w.readSegment(r, index, false); err != io.EOF { - return errors.Wrap(err, "readSegment") - } - - if r.Offset() != size { - level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint", "totalRead", r.Offset(), "size", size) - } level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir) - return nil } @@ -515,26 +522,6 @@ func checkpointNum(dir string) (int, error) { return result, nil } -func getCheckpointSize(dir string) (int64, error) { - i := int64(0) - segs, err := fileutil.ReadDir(dir) - if err != nil { - return 0, err - } - for _, fn := range segs { - num, err := strconv.Atoi(fn) - if err != nil { - return i, err - } - sz, err := getSegmentSize(dir, num) - if err != nil { - return i, err - } - i += sz - } - return i, nil -} - // Get size of segment. func getSegmentSize(dir string, index int) (int64, error) { i := int64(-1) diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go index 27994a595..a802a7646 100644 --- a/storage/remote/wal_watcher_test.go +++ b/storage/remote/wal_watcher_test.go @@ -88,7 +88,7 @@ func newWriteToMock() *writeToMock { } } -func Test_tail_samples(t *testing.T) { +func TestTailSamples(t *testing.T) { pageSize := 32 * 1024 const seriesCount = 10 const samplesCount = 250 @@ -160,7 +160,7 @@ func Test_tail_samples(t *testing.T) { testutil.Equals(t, expectedSamples, wt.samplesAppended) } -func Test_readToEnd_noCheckpoint(t *testing.T) { +func TestReadToEndNoCheckpoint(t *testing.T) { pageSize := 32 * 1024 const seriesCount = 10 const samplesCount = 250 @@ -222,7 +222,7 @@ func Test_readToEnd_noCheckpoint(t *testing.T) { testutil.Equals(t, expected, wt.checkNumLabels()) } -func Test_readToEnd_withCheckpoint(t *testing.T) { +func TestReadToEndWithCheckpoint(t *testing.T) { segmentSize := 32 * 1024 // We need something similar to this # of series and samples // in order to get enough segments for us to checkpoint. @@ -304,7 +304,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { testutil.Equals(t, expected, wt.checkNumLabels()) } -func Test_readCheckpoint(t *testing.T) { +func TestReadCheckpoint(t *testing.T) { pageSize := 32 * 1024 const seriesCount = 10 const samplesCount = 250 @@ -366,12 +366,77 @@ func Test_readCheckpoint(t *testing.T) { testutil.Equals(t, expectedSeries, wt.checkNumLabels()) } -func Test_checkpoint_seriesReset(t *testing.T) { +func TestReadCheckpointMultipleSegments(t *testing.T) { + pageSize := 32 * 1024 + + const segments = 1 + const seriesCount = 20 + const samplesCount = 300 + + dir, err := ioutil.TempDir("", "readCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + enc := tsdb.RecordEncoder{} + w, err := wal.NewSize(nil, nil, wdir, pageSize) + testutil.Ok(t, err) + + // Write a bunch of data. + for i := 0; i < segments; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{"__name__", fmt.Sprintf("metric_%d", j)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + } + + // At this point we should have at least 6 segments, lets create a checkpoint dir of the first 5. + checkpointDir := dir + "/wal/checkpoint.000004" + err = os.Mkdir(checkpointDir, 0777) + testutil.Ok(t, err) + for i := 0; i <= 4; i++ { + err := os.Rename(wal.SegmentName(dir+"/wal", i), wal.SegmentName(checkpointDir, i)) + testutil.Ok(t, err) + } + + wt := newWriteToMock() + watcher := NewWALWatcher(nil, "", wt, dir) + watcher.maxSegment = -1 + + lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir) + testutil.Ok(t, err) + + err = watcher.readCheckpoint(lastCheckpoint) + testutil.Ok(t, err) +} + +func TestCheckpointSeriesReset(t *testing.T) { segmentSize := 32 * 1024 // We need something similar to this # of series and samples // in order to get enough segments for us to checkpoint. - const seriesCount = 10 - const samplesCount = 250 + const seriesCount = 20 + const samplesCount = 350 dir, err := ioutil.TempDir("", "seriesReset") testutil.Ok(t, err) @@ -421,20 +486,22 @@ func Test_checkpoint_seriesReset(t *testing.T) { retry(t, defaultRetryInterval, defaultRetries, func() bool { return wt.checkNumLabels() >= expected }) - watcher.Stop() testutil.Equals(t, seriesCount, wt.checkNumLabels()) - _, err = tsdb.Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0) + _, err = tsdb.Checkpoint(w, 2, 4, func(x uint64) bool { return true }, 0) testutil.Ok(t, err) - w.Truncate(1) + err = w.Truncate(5) + testutil.Ok(t, err) _, cpi, err := tsdb.LastCheckpoint(path.Join(dir, "wal")) testutil.Ok(t, err) err = watcher.garbageCollectSeries(cpi + 1) testutil.Ok(t, err) + + watcher.Stop() // If you modify the checkpoint and truncate segment #'s run the test to see how // many series records you end up with and change the last Equals check accordingly // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) - testutil.Equals(t, 6, wt.checkNumLabels()) + testutil.Equals(t, 14, wt.checkNumLabels()) }