diff --git a/checkpoint.go b/checkpoint.go index b8de5d14a..aa8170520 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -38,7 +38,7 @@ type CheckpointStats struct { TotalTombstones int // Processed tombstones including dropped ones. } -// LastCheckpoint returns the directory name of the most recent checkpoint. +// LastCheckpoint returns the directory name and index of the most recent checkpoint. // If dir does not contain any checkpoints, ErrNotFound is returned. func LastCheckpoint(dir string) (string, int, error) { files, err := ioutil.ReadDir(dir) @@ -55,18 +55,17 @@ func LastCheckpoint(dir string) (string, int, error) { if !fi.IsDir() { return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name()) } - k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) if err != nil { continue } - return fi.Name(), k, nil + return fi.Name(), idx, nil } return "", 0, ErrNotFound } -// DeleteCheckpoints deletes all checkpoints in dir that have an index -// below n. -func DeleteCheckpoints(dir string, n int) error { +// DeleteCheckpoints deletes all checkpoints in a directory below a given index. +func DeleteCheckpoints(dir string, maxIndex int) error { var errs MultiError files, err := ioutil.ReadDir(dir) @@ -77,8 +76,8 @@ func DeleteCheckpoints(dir string, n int) error { if !strings.HasPrefix(fi.Name(), checkpointPrefix) { continue } - k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) - if err != nil || k >= n { + index, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil || index >= maxIndex { continue } if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { @@ -90,7 +89,7 @@ func DeleteCheckpoints(dir string, n int) error { const checkpointPrefix = "checkpoint." -// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL. +// Checkpoint creates a compacted checkpoint of segments in range [first, last] in the given WAL. // It includes the most recent checkpoint if it exists. // All series not satisfying keep and samples below mint are dropped. // @@ -98,7 +97,7 @@ const checkpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sr io.Reader @@ -107,27 +106,28 @@ func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*C // files if there is an error somewhere. var closers []io.Closer { - lastFn, k, err := LastCheckpoint(w.Dir()) + dir, idx, err := LastCheckpoint(w.Dir()) if err != nil && err != ErrNotFound { return nil, errors.Wrap(err, "find last checkpoint") } + last := idx + 1 if err == nil { - if m > k+1 { - return nil, errors.New("unexpected gap to last checkpoint") + if from > last { + return nil, fmt.Errorf("unexpected gap to last checkpoint. expected:%v, requested:%v", last, from) } // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. - m = k + 1 + from = last - last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn)) + r, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), dir)) if err != nil { return nil, errors.Wrap(err, "open last checkpoint") } - defer last.Close() - closers = append(closers, last) - sr = last + defer r.Close() + closers = append(closers, r) + sr = r } - segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) + segsr, err := wal.NewSegmentsRangeReader(w.Dir(), from, to) if err != nil { return nil, errors.Wrap(err, "create segment reader") } @@ -141,7 +141,7 @@ func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*C } } - cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n)) + cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) cpdirtmp := cpdir + ".tmp" if err := os.MkdirAll(cpdirtmp, 0777); err != nil { diff --git a/head.go b/head.go index 8d259fd66..d167450fb 100644 --- a/head.go +++ b/head.go @@ -418,12 +418,12 @@ func (h *Head) Init() error { } // Backfill the checkpoint first if it exists. - cp, n, err := LastCheckpoint(h.wal.Dir()) + dir, startFrom, err := LastCheckpoint(h.wal.Dir()) if err != nil && err != ErrNotFound { return errors.Wrap(err, "find last checkpoint") } if err == nil { - sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp)) + sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), dir)) if err != nil { return errors.Wrap(err, "open checkpoint") } @@ -434,11 +434,11 @@ func (h *Head) Init() error { if err := h.loadWAL(wal.NewReader(sr)); err != nil { return errors.Wrap(err, "backfill checkpoint") } - n++ + startFrom++ } // Backfill segments from the last checkpoint onwards - sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1) + sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), startFrom, -1) if err != nil { return errors.Wrap(err, "open WAL segments") } @@ -493,18 +493,18 @@ func (h *Head) Truncate(mint int64) (err error) { } start = time.Now() - m, n, err := h.wal.Segments() + first, last, err := h.wal.Segments() if err != nil { return errors.Wrap(err, "get segment range") } - n-- // Never consider last segment for checkpoint. - if n < 0 { + last-- // Never consider last segment for checkpoint. + if last < 0 { return nil // no segments yet. } // The lower third of segments should contain mostly obsolete samples. // If we have less than three segments, it's not worth checkpointing yet. - n = m + (n-m)/3 - if n <= m { + last = first + (last-first)/3 + if last <= first { return nil } @@ -512,18 +512,18 @@ func (h *Head) Truncate(mint int64) (err error) { return h.series.getByID(id) != nil } h.metrics.checkpointCreationTotal.Inc() - if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil { + if _, err = Checkpoint(h.wal, first, last, keep, mint); err != nil { h.metrics.checkpointCreationFail.Inc() return errors.Wrap(err, "create checkpoint") } - if err := h.wal.Truncate(n + 1); err != nil { + if err := h.wal.Truncate(last + 1); err != nil { // If truncating fails, we'll just try again at the next checkpoint. // Leftover segments will just be ignored in the future if there's a checkpoint // that supersedes them. level.Error(h.logger).Log("msg", "truncating segments failed", "err", err) } h.metrics.checkpointDeleteTotal.Inc() - if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil { + if err := DeleteCheckpoints(h.wal.Dir(), last); err != nil { // Leftover old checkpoints do not cause problems down the line beyond // occupying disk space. // They will just be ignored since a higher checkpoint exists. @@ -533,7 +533,7 @@ func (h *Head) Truncate(mint int64) (err error) { h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) level.Info(h.logger).Log("msg", "WAL checkpoint complete", - "low", m, "high", n, "duration", time.Since(start)) + "first", first, "last", last, "duration", time.Since(start)) return nil } diff --git a/wal/wal.go b/wal/wal.go index d9a59c005..20ed69e7d 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -289,13 +289,13 @@ func (w *WAL) Repair(origErr error) error { if err != nil { return errors.Wrap(err, "list segments") } - level.Warn(w.logger).Log("msg", "deleting all segments behind corruption") + level.Warn(w.logger).Log("msg", "deleting all segments behind corruption", "segment", cerr.Segment) for _, s := range segs { - if s.n <= cerr.Segment { + if s.index <= cerr.Segment { continue } - if w.segment.i == s.n { + if w.segment.i == s.index { // The active segment needs to be removed, // close it first (Windows!). Can be closed safely // as we set the current segment to repaired file @@ -304,14 +304,14 @@ func (w *WAL) Repair(origErr error) error { return errors.Wrap(err, "close active segment") } } - if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { - return errors.Wrap(err, "delete segment") + if err := os.Remove(filepath.Join(w.dir, s.name)); err != nil { + return errors.Wrapf(err, "delete segment:%v", s.index) } } // Regardless of the corruption offset, no record reaches into the previous segment. // So we can safely repair the WAL by removing the segment and re-inserting all // its records up to the corruption. - level.Warn(w.logger).Log("msg", "rewrite corrupted segment") + level.Warn(w.logger).Log("msg", "rewrite corrupted segment", "segment", cerr.Segment) fn := SegmentName(w.dir, cerr.Segment) tmpfn := fn + ".repair" @@ -523,9 +523,9 @@ func (w *WAL) log(rec []byte, final bool) error { return nil } -// Segments returns the range [m, n] of currently existing segments. -// If no segments are found, m and n are -1. -func (w *WAL) Segments() (m, n int, err error) { +// Segments returns the range [first, n] of currently existing segments. +// If no segments are found, first and n are -1. +func (w *WAL) Segments() (first, last int, err error) { refs, err := listSegments(w.dir) if err != nil { return 0, 0, err @@ -533,7 +533,7 @@ func (w *WAL) Segments() (m, n int, err error) { if len(refs) == 0 { return -1, -1, nil } - return refs[0].n, refs[len(refs)-1].n, nil + return refs[0].index, refs[len(refs)-1].index, nil } // Truncate drops all segments before i. @@ -549,10 +549,10 @@ func (w *WAL) Truncate(i int) (err error) { return err } for _, r := range refs { - if r.n >= i { + if r.index >= i { break } - if err = os.Remove(filepath.Join(w.dir, r.s)); err != nil { + if err = os.Remove(filepath.Join(w.dir, r.name)); err != nil { return err } } @@ -595,8 +595,8 @@ func (w *WAL) Close() (err error) { } type segmentRef struct { - s string - n int + name string + index int } func listSegments(dir string) (refs []segmentRef, err error) { @@ -613,11 +613,11 @@ func listSegments(dir string) (refs []segmentRef, err error) { if len(refs) > 0 && k > last+1 { return nil, errors.New("segments are not sequential") } - refs = append(refs, segmentRef{s: fn, n: k}) + refs = append(refs, segmentRef{name: fn, index: k}) last = k } sort.Slice(refs, func(i, j int) bool { - return refs[i].n < refs[j].n + return refs[i].index < refs[j].index }) return refs, nil } @@ -628,8 +628,8 @@ func NewSegmentsReader(dir string) (io.ReadCloser, error) { } // NewSegmentsRangeReader returns a new reader over the given WAL segment range. -// If m or n are -1, the range is open on the respective end. -func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { +// If first or last are -1, the range is open on the respective end. +func NewSegmentsRangeReader(dir string, first, last int) (io.ReadCloser, error) { refs, err := listSegments(dir) if err != nil { return nil, err @@ -637,13 +637,13 @@ func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { var segs []*Segment for _, r := range refs { - if m >= 0 && r.n < m { + if first >= 0 && r.index < first { continue } - if n >= 0 && r.n > n { + if last >= 0 && r.index > last { break } - s, err := OpenReadSegment(filepath.Join(dir, r.s)) + s, err := OpenReadSegment(filepath.Join(dir, r.name)) if err != nil { return nil, err }