mirror of
https://github.com/prometheus/prometheus
synced 2024-12-24 23:42:32 +00:00
Remote Write: fix checkpoint reading (#5429)
* Fix ReadCheckpoint to ensure that it actually reads all the contents of each segment in a checkpoint dir, or returns an error. Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
parent
f7184978f4
commit
c2b88992a3
@ -216,7 +216,7 @@ func (w *WALWatcher) run() error {
|
|||||||
|
|
||||||
// findSegmentForIndex finds the first segment greater than or equal to index.
|
// findSegmentForIndex finds the first segment greater than or equal to index.
|
||||||
func (w *WALWatcher) findSegmentForIndex(index int) (int, error) {
|
func (w *WALWatcher) findSegmentForIndex(index int) (int, error) {
|
||||||
refs, err := w.segments()
|
refs, err := w.segments(w.walDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, nil
|
return -1, nil
|
||||||
}
|
}
|
||||||
@ -231,7 +231,7 @@ func (w *WALWatcher) findSegmentForIndex(index int) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *WALWatcher) firstAndLast() (int, int, error) {
|
func (w *WALWatcher) firstAndLast() (int, int, error) {
|
||||||
refs, err := w.segments()
|
refs, err := w.segments(w.walDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return -1, -1, nil
|
return -1, -1, nil
|
||||||
}
|
}
|
||||||
@ -244,8 +244,8 @@ func (w *WALWatcher) firstAndLast() (int, int, error) {
|
|||||||
|
|
||||||
// Copied from tsdb/wal/wal.go so we do not have to open a WAL.
|
// Copied from tsdb/wal/wal.go so we do not have to open a WAL.
|
||||||
// Plan is to move WAL watcher to TSDB and dedupe these implementations.
|
// Plan is to move WAL watcher to TSDB and dedupe these implementations.
|
||||||
func (w *WALWatcher) segments() ([]int, error) {
|
func (w *WALWatcher) segments(dir string) ([]int, error) {
|
||||||
files, err := fileutil.ReadDir(w.walDir)
|
files, err := fileutil.ReadDir(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -476,27 +476,34 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error {
|
|||||||
return errors.Wrap(err, "checkpointNum")
|
return errors.Wrap(err, "checkpointNum")
|
||||||
}
|
}
|
||||||
|
|
||||||
sr, err := wal.NewSegmentsReader(checkpointDir)
|
// Ensure we read the whole contents of every segment in the checkpoint dir.
|
||||||
|
segs, err := w.segments(checkpointDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "NewSegmentsReader")
|
return errors.Wrap(err, "Unable to get segments checkpoint dir")
|
||||||
}
|
}
|
||||||
defer sr.Close()
|
for _, seg := range segs {
|
||||||
|
size, err := getSegmentSize(checkpointDir, seg)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "getSegmentSize")
|
||||||
|
}
|
||||||
|
|
||||||
size, err := getCheckpointSize(checkpointDir)
|
sr, err := wal.OpenReadSegment(wal.SegmentName(checkpointDir, seg))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "getCheckpointSize")
|
return errors.Wrap(err, "unable to open segment")
|
||||||
|
}
|
||||||
|
defer sr.Close()
|
||||||
|
|
||||||
|
r := wal.NewLiveReader(w.logger, sr)
|
||||||
|
if err := w.readSegment(r, index, false); err != io.EOF && err != nil {
|
||||||
|
return errors.Wrap(err, "readSegment")
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.Offset() != size {
|
||||||
|
return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, seg, size, r.Offset())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
r := wal.NewLiveReader(w.logger, sr)
|
|
||||||
if err := w.readSegment(r, index, false); err != io.EOF {
|
|
||||||
return errors.Wrap(err, "readSegment")
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.Offset() != size {
|
|
||||||
level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint", "totalRead", r.Offset(), "size", size)
|
|
||||||
}
|
|
||||||
level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir)
|
level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -515,26 +522,6 @@ func checkpointNum(dir string) (int, error) {
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCheckpointSize(dir string) (int64, error) {
|
|
||||||
i := int64(0)
|
|
||||||
segs, err := fileutil.ReadDir(dir)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
for _, fn := range segs {
|
|
||||||
num, err := strconv.Atoi(fn)
|
|
||||||
if err != nil {
|
|
||||||
return i, err
|
|
||||||
}
|
|
||||||
sz, err := getSegmentSize(dir, num)
|
|
||||||
if err != nil {
|
|
||||||
return i, err
|
|
||||||
}
|
|
||||||
i += sz
|
|
||||||
}
|
|
||||||
return i, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get size of segment.
|
// Get size of segment.
|
||||||
func getSegmentSize(dir string, index int) (int64, error) {
|
func getSegmentSize(dir string, index int) (int64, error) {
|
||||||
i := int64(-1)
|
i := int64(-1)
|
||||||
|
@ -88,7 +88,7 @@ func newWriteToMock() *writeToMock {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_tail_samples(t *testing.T) {
|
func TestTailSamples(t *testing.T) {
|
||||||
pageSize := 32 * 1024
|
pageSize := 32 * 1024
|
||||||
const seriesCount = 10
|
const seriesCount = 10
|
||||||
const samplesCount = 250
|
const samplesCount = 250
|
||||||
@ -160,7 +160,7 @@ func Test_tail_samples(t *testing.T) {
|
|||||||
testutil.Equals(t, expectedSamples, wt.samplesAppended)
|
testutil.Equals(t, expectedSamples, wt.samplesAppended)
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_readToEnd_noCheckpoint(t *testing.T) {
|
func TestReadToEndNoCheckpoint(t *testing.T) {
|
||||||
pageSize := 32 * 1024
|
pageSize := 32 * 1024
|
||||||
const seriesCount = 10
|
const seriesCount = 10
|
||||||
const samplesCount = 250
|
const samplesCount = 250
|
||||||
@ -222,7 +222,7 @@ func Test_readToEnd_noCheckpoint(t *testing.T) {
|
|||||||
testutil.Equals(t, expected, wt.checkNumLabels())
|
testutil.Equals(t, expected, wt.checkNumLabels())
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_readToEnd_withCheckpoint(t *testing.T) {
|
func TestReadToEndWithCheckpoint(t *testing.T) {
|
||||||
segmentSize := 32 * 1024
|
segmentSize := 32 * 1024
|
||||||
// We need something similar to this # of series and samples
|
// We need something similar to this # of series and samples
|
||||||
// in order to get enough segments for us to checkpoint.
|
// in order to get enough segments for us to checkpoint.
|
||||||
@ -304,7 +304,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) {
|
|||||||
testutil.Equals(t, expected, wt.checkNumLabels())
|
testutil.Equals(t, expected, wt.checkNumLabels())
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_readCheckpoint(t *testing.T) {
|
func TestReadCheckpoint(t *testing.T) {
|
||||||
pageSize := 32 * 1024
|
pageSize := 32 * 1024
|
||||||
const seriesCount = 10
|
const seriesCount = 10
|
||||||
const samplesCount = 250
|
const samplesCount = 250
|
||||||
@ -366,12 +366,77 @@ func Test_readCheckpoint(t *testing.T) {
|
|||||||
testutil.Equals(t, expectedSeries, wt.checkNumLabels())
|
testutil.Equals(t, expectedSeries, wt.checkNumLabels())
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_checkpoint_seriesReset(t *testing.T) {
|
func TestReadCheckpointMultipleSegments(t *testing.T) {
|
||||||
|
pageSize := 32 * 1024
|
||||||
|
|
||||||
|
const segments = 1
|
||||||
|
const seriesCount = 20
|
||||||
|
const samplesCount = 300
|
||||||
|
|
||||||
|
dir, err := ioutil.TempDir("", "readCheckpoint")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
wdir := path.Join(dir, "wal")
|
||||||
|
err = os.Mkdir(wdir, 0777)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
enc := tsdb.RecordEncoder{}
|
||||||
|
w, err := wal.NewSize(nil, nil, wdir, pageSize)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
// Write a bunch of data.
|
||||||
|
for i := 0; i < segments; i++ {
|
||||||
|
for j := 0; j < seriesCount; j++ {
|
||||||
|
ref := j + (i * 100)
|
||||||
|
series := enc.Series([]tsdb.RefSeries{
|
||||||
|
tsdb.RefSeries{
|
||||||
|
Ref: uint64(ref),
|
||||||
|
Labels: labels.Labels{labels.Label{"__name__", fmt.Sprintf("metric_%d", j)}},
|
||||||
|
},
|
||||||
|
}, nil)
|
||||||
|
testutil.Ok(t, w.Log(series))
|
||||||
|
|
||||||
|
for k := 0; k < samplesCount; k++ {
|
||||||
|
inner := rand.Intn(ref + 1)
|
||||||
|
sample := enc.Samples([]tsdb.RefSample{
|
||||||
|
tsdb.RefSample{
|
||||||
|
Ref: uint64(inner),
|
||||||
|
T: int64(i),
|
||||||
|
V: float64(i),
|
||||||
|
},
|
||||||
|
}, nil)
|
||||||
|
testutil.Ok(t, w.Log(sample))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// At this point we should have at least 6 segments, lets create a checkpoint dir of the first 5.
|
||||||
|
checkpointDir := dir + "/wal/checkpoint.000004"
|
||||||
|
err = os.Mkdir(checkpointDir, 0777)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
for i := 0; i <= 4; i++ {
|
||||||
|
err := os.Rename(wal.SegmentName(dir+"/wal", i), wal.SegmentName(checkpointDir, i))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wt := newWriteToMock()
|
||||||
|
watcher := NewWALWatcher(nil, "", wt, dir)
|
||||||
|
watcher.maxSegment = -1
|
||||||
|
|
||||||
|
lastCheckpoint, _, err := tsdb.LastCheckpoint(watcher.walDir)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
err = watcher.readCheckpoint(lastCheckpoint)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckpointSeriesReset(t *testing.T) {
|
||||||
segmentSize := 32 * 1024
|
segmentSize := 32 * 1024
|
||||||
// We need something similar to this # of series and samples
|
// We need something similar to this # of series and samples
|
||||||
// in order to get enough segments for us to checkpoint.
|
// in order to get enough segments for us to checkpoint.
|
||||||
const seriesCount = 10
|
const seriesCount = 20
|
||||||
const samplesCount = 250
|
const samplesCount = 350
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "seriesReset")
|
dir, err := ioutil.TempDir("", "seriesReset")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
@ -421,20 +486,22 @@ func Test_checkpoint_seriesReset(t *testing.T) {
|
|||||||
retry(t, defaultRetryInterval, defaultRetries, func() bool {
|
retry(t, defaultRetryInterval, defaultRetries, func() bool {
|
||||||
return wt.checkNumLabels() >= expected
|
return wt.checkNumLabels() >= expected
|
||||||
})
|
})
|
||||||
watcher.Stop()
|
|
||||||
testutil.Equals(t, seriesCount, wt.checkNumLabels())
|
testutil.Equals(t, seriesCount, wt.checkNumLabels())
|
||||||
|
|
||||||
_, err = tsdb.Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0)
|
_, err = tsdb.Checkpoint(w, 2, 4, func(x uint64) bool { return true }, 0)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
w.Truncate(1)
|
err = w.Truncate(5)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
_, cpi, err := tsdb.LastCheckpoint(path.Join(dir, "wal"))
|
_, cpi, err := tsdb.LastCheckpoint(path.Join(dir, "wal"))
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
err = watcher.garbageCollectSeries(cpi + 1)
|
err = watcher.garbageCollectSeries(cpi + 1)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
watcher.Stop()
|
||||||
// If you modify the checkpoint and truncate segment #'s run the test to see how
|
// If you modify the checkpoint and truncate segment #'s run the test to see how
|
||||||
// many series records you end up with and change the last Equals check accordingly
|
// many series records you end up with and change the last Equals check accordingly
|
||||||
// or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10)
|
// or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10)
|
||||||
testutil.Equals(t, 6, wt.checkNumLabels())
|
testutil.Equals(t, 14, wt.checkNumLabels())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user