From fb48a351f0b274344eb92b1994d6302e4488f83d Mon Sep 17 00:00:00 2001 From: Matthieu MOREL Date: Wed, 8 Nov 2023 21:45:14 +0100 Subject: [PATCH] tsdb/wlog: use Go standard errors package Signed-off-by: Matthieu MOREL --- tsdb/wlog/checkpoint.go | 46 ++++++++++++++++++------------------ tsdb/wlog/checkpoint_test.go | 3 +-- tsdb/wlog/live_reader.go | 10 ++++---- tsdb/wlog/reader.go | 17 ++++++------- tsdb/wlog/wlog.go | 40 +++++++++++++++---------------- 5 files changed, 57 insertions(+), 59 deletions(-) diff --git a/tsdb/wlog/checkpoint.go b/tsdb/wlog/checkpoint.go index d64599c27..3d5b56da2 100644 --- a/tsdb/wlog/checkpoint.go +++ b/tsdb/wlog/checkpoint.go @@ -15,6 +15,7 @@ package wlog import ( + "errors" "fmt" "io" "math" @@ -25,7 +26,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/pkg/errors" "golang.org/x/exp/slices" "github.com/prometheus/prometheus/tsdb/chunks" @@ -102,8 +102,8 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head { var sgmRange []SegmentRange dir, idx, err := LastCheckpoint(w.Dir()) - if err != nil && err != record.ErrNotFound { - return nil, errors.Wrap(err, "find last checkpoint") + if err != nil && !errors.Is(err, record.ErrNotFound) { + return nil, fmt.Errorf("find last checkpoint: %w", err) } last := idx + 1 if err == nil { @@ -119,7 +119,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head sgmRange = append(sgmRange, SegmentRange{Dir: w.Dir(), First: from, Last: to}) sgmReader, err = NewSegmentsRangeReader(sgmRange...) if err != nil { - return nil, errors.Wrap(err, "create segment reader") + return nil, fmt.Errorf("create segment reader: %w", err) } defer sgmReader.Close() } @@ -128,15 +128,15 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head cpdirtmp := cpdir + ".tmp" if err := os.RemoveAll(cpdirtmp); err != nil { - return nil, errors.Wrap(err, "remove previous temporary checkpoint dir") + return nil, fmt.Errorf("remove previous temporary checkpoint dir: %w", err) } if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { - return nil, errors.Wrap(err, "create checkpoint dir") + return nil, fmt.Errorf("create checkpoint dir: %w", err) } cp, err := New(nil, nil, cpdirtmp, w.CompressionType()) if err != nil { - return nil, errors.Wrap(err, "open checkpoint") + return nil, fmt.Errorf("open checkpoint: %w", err) } // Ensures that an early return caused by an error doesn't leave any tmp files. @@ -174,7 +174,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head case record.Series: series, err = dec.Series(rec, series) if err != nil { - return nil, errors.Wrap(err, "decode series") + return nil, fmt.Errorf("decode series: %w", err) } // Drop irrelevant series in place. repl := series[:0] @@ -192,7 +192,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head case record.Samples: samples, err = dec.Samples(rec, samples) if err != nil { - return nil, errors.Wrap(err, "decode samples") + return nil, fmt.Errorf("decode samples: %w", err) } // Drop irrelevant samples in place. repl := samples[:0] @@ -210,7 +210,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head case record.HistogramSamples: histogramSamples, err = dec.HistogramSamples(rec, histogramSamples) if err != nil { - return nil, errors.Wrap(err, "decode histogram samples") + return nil, fmt.Errorf("decode histogram samples: %w", err) } // Drop irrelevant histogramSamples in place. repl := histogramSamples[:0] @@ -228,7 +228,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head case record.Tombstones: tstones, err = dec.Tombstones(rec, tstones) if err != nil { - return nil, errors.Wrap(err, "decode deletes") + return nil, fmt.Errorf("decode deletes: %w", err) } // Drop irrelevant tombstones in place. repl := tstones[:0] @@ -249,7 +249,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head case record.Exemplars: exemplars, err = dec.Exemplars(rec, exemplars) if err != nil { - return nil, errors.Wrap(err, "decode exemplars") + return nil, fmt.Errorf("decode exemplars: %w", err) } // Drop irrelevant exemplars in place. repl := exemplars[:0] @@ -266,7 +266,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head case record.Metadata: metadata, err := dec.Metadata(rec, metadata) if err != nil { - return nil, errors.Wrap(err, "decode metadata") + return nil, fmt.Errorf("decode metadata: %w", err) } // Only keep reference to the latest found metadata for each refID. repl := 0 @@ -292,7 +292,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head // Flush records in 1 MB increments. if len(buf) > 1*1024*1024 { if err := cp.Log(recs...); err != nil { - return nil, errors.Wrap(err, "flush records") + return nil, fmt.Errorf("flush records: %w", err) } buf, recs = buf[:0], recs[:0] } @@ -300,12 +300,12 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head // If we hit any corruption during checkpointing, repairing is not an option. // The head won't know which series records are lost. if r.Err() != nil { - return nil, errors.Wrap(r.Err(), "read segments") + return nil, fmt.Errorf("read segments: %w", r.Err()) } // Flush remaining records. if err := cp.Log(recs...); err != nil { - return nil, errors.Wrap(err, "flush records") + return nil, fmt.Errorf("flush records: %w", err) } // Flush latest metadata records for each series. @@ -315,29 +315,29 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head latestMetadata = append(latestMetadata, m) } if err := cp.Log(enc.Metadata(latestMetadata, buf[:0])); err != nil { - return nil, errors.Wrap(err, "flush metadata records") + return nil, fmt.Errorf("flush metadata records: %w", err) } } if err := cp.Close(); err != nil { - return nil, errors.Wrap(err, "close checkpoint") + return nil, fmt.Errorf("close checkpoint: %w", err) } // Sync temporary directory before rename. df, err := fileutil.OpenDir(cpdirtmp) if err != nil { - return nil, errors.Wrap(err, "open temporary checkpoint directory") + return nil, fmt.Errorf("open temporary checkpoint directory: %w", err) } if err := df.Sync(); err != nil { df.Close() - return nil, errors.Wrap(err, "sync temporary checkpoint directory") + return nil, fmt.Errorf("sync temporary checkpoint directory: %w", err) } if err = df.Close(); err != nil { - return nil, errors.Wrap(err, "close temporary checkpoint directory") + return nil, fmt.Errorf("close temporary checkpoint directory: %w", err) } if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { - return nil, errors.Wrap(err, "rename checkpoint directory") + return nil, fmt.Errorf("rename checkpoint directory: %w", err) } return stats, nil @@ -364,7 +364,7 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) { continue } if !fi.IsDir() { - return nil, errors.Errorf("checkpoint %s is not a directory", fi.Name()) + return nil, fmt.Errorf("checkpoint %s is not a directory", fi.Name()) } idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) if err != nil { diff --git a/tsdb/wlog/checkpoint_test.go b/tsdb/wlog/checkpoint_test.go index 704a65cc1..381e09186 100644 --- a/tsdb/wlog/checkpoint_test.go +++ b/tsdb/wlog/checkpoint_test.go @@ -23,7 +23,6 @@ import ( "testing" "github.com/go-kit/log" - "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/histogram" @@ -325,7 +324,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { // Walk the wlog dir to make sure there are no tmp folder left behind after the error. err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error { if err != nil { - return errors.Wrapf(err, "access err %q: %v", path, err) + return fmt.Errorf("access err %q: %w", path, err) } if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") { return fmt.Errorf("wlog dir contains temporary folder:%s", info.Name()) diff --git a/tsdb/wlog/live_reader.go b/tsdb/wlog/live_reader.go index a440eedf7..905bbf00d 100644 --- a/tsdb/wlog/live_reader.go +++ b/tsdb/wlog/live_reader.go @@ -16,6 +16,7 @@ package wlog import ( "encoding/binary" + "errors" "fmt" "hash/crc32" "io" @@ -24,7 +25,6 @@ import ( "github.com/go-kit/log/level" "github.com/golang/snappy" "github.com/klauspost/compress/zstd" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -135,7 +135,7 @@ func (r *LiveReader) Next() bool { switch ok, err := r.buildRecord(); { case ok: return true - case err != nil && err != io.EOF: + case err != nil && !errors.Is(err, io.EOF): r.err = err return false } @@ -157,7 +157,7 @@ func (r *LiveReader) Next() bool { if r.writeIndex != pageSize { n, err := r.fillBuffer() - if n == 0 || (err != nil && err != io.EOF) { + if n == 0 || (err != nil && !errors.Is(err, io.EOF)) { r.err = err return false } @@ -265,7 +265,7 @@ func validateRecord(typ recType, i int) error { } return nil default: - return errors.Errorf("unexpected record type %d", typ) + return fmt.Errorf("unexpected record type %d", typ) } } @@ -322,7 +322,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) { rec := r.buf[r.readIndex+recordHeaderSize : r.readIndex+recordHeaderSize+length] if c := crc32.Checksum(rec, castagnoliTable); c != crc { - return nil, 0, errors.Errorf("unexpected checksum %x, expected %x", c, crc) + return nil, 0, fmt.Errorf("unexpected checksum %x, expected %x", c, crc) } return rec, length + recordHeaderSize, nil diff --git a/tsdb/wlog/reader.go b/tsdb/wlog/reader.go index f77b03b8e..a744b0cc4 100644 --- a/tsdb/wlog/reader.go +++ b/tsdb/wlog/reader.go @@ -16,12 +16,13 @@ package wlog import ( "encoding/binary" + "errors" + "fmt" "hash/crc32" "io" "github.com/golang/snappy" "github.com/klauspost/compress/zstd" - "github.com/pkg/errors" ) // Reader reads WAL records from an io.Reader. @@ -47,7 +48,7 @@ func NewReader(r io.Reader) *Reader { // It must not be called again after it returned false. func (r *Reader) Next() bool { err := r.next() - if errors.Is(err, io.EOF) { + if err != nil && errors.Is(err, io.EOF) { // The last WAL segment record shouldn't be torn(should be full or last). // The last record would be torn after a crash just before // the last record part could be persisted to disk. @@ -72,7 +73,7 @@ func (r *Reader) next() (err error) { i := 0 for { if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil { - return errors.Wrap(err, "read first header byte") + return fmt.Errorf("read first header byte: %w", err) } r.total++ r.curRecTyp = recTypeFromHeader(hdr[0]) @@ -95,7 +96,7 @@ func (r *Reader) next() (err error) { } n, err := io.ReadFull(r.rdr, buf[:k]) if err != nil { - return errors.Wrap(err, "read remaining zeros") + return fmt.Errorf("read remaining zeros: %w", err) } r.total += int64(n) @@ -108,7 +109,7 @@ func (r *Reader) next() (err error) { } n, err := io.ReadFull(r.rdr, hdr[1:]) if err != nil { - return errors.Wrap(err, "read remaining header") + return fmt.Errorf("read remaining header: %w", err) } r.total += int64(n) @@ -118,7 +119,7 @@ func (r *Reader) next() (err error) { ) if length > pageSize-recordHeaderSize { - return errors.Errorf("invalid record size %d", length) + return fmt.Errorf("invalid record size %d", length) } n, err = io.ReadFull(r.rdr, buf[:length]) if err != nil { @@ -127,10 +128,10 @@ func (r *Reader) next() (err error) { r.total += int64(n) if n != int(length) { - return errors.Errorf("invalid size: expected %d, got %d", length, n) + return fmt.Errorf("invalid size: expected %d, got %d", length, n) } if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { - return errors.Errorf("unexpected checksum %x, expected %x", c, crc) + return fmt.Errorf("unexpected checksum %x, expected %x", c, crc) } if isSnappyCompressed || isZstdCompressed { diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index 16924d249..c4305bcbf 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -17,6 +17,7 @@ package wlog import ( "bufio" "encoding/binary" + "errors" "fmt" "hash/crc32" "io" @@ -30,7 +31,6 @@ import ( "github.com/go-kit/log/level" "github.com/golang/snappy" "github.com/klauspost/compress/zstd" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/slices" @@ -137,7 +137,7 @@ func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) { level.Warn(logger).Log("msg", "Last page of the wlog is torn, filling it with zeros", "segment", segName) if _, err := f.Write(make([]byte, pageSize-d)); err != nil { f.Close() - return nil, errors.Wrap(err, "zero-pad torn page") + return nil, fmt.Errorf("zero-pad torn page: %w", err) } } return &Segment{SegmentFile: f, i: k, dir: dir}, nil @@ -298,7 +298,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi return nil, errors.New("invalid segment size") } if err := os.MkdirAll(dir, 0o777); err != nil { - return nil, errors.Wrap(err, "create dir") + return nil, fmt.Errorf("create dir: %w", err) } if logger == nil { logger = log.NewNopLogger() @@ -331,7 +331,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi _, last, err := Segments(w.Dir()) if err != nil { - return nil, errors.Wrap(err, "get segment range") + return nil, fmt.Errorf("get segment range: %w", err) } // Index of the Segment we want to open and write to. @@ -414,11 +414,9 @@ func (w *WL) Repair(origErr error) error { // But that's not generally applicable if the records have any kind of causality. // Maybe as an extra mode in the future if mid-WAL corruptions become // a frequent concern. - err := errors.Cause(origErr) // So that we can pick up errors even if wrapped. - - cerr, ok := err.(*CorruptionErr) - if !ok { - return errors.Wrap(origErr, "cannot handle error") + var cerr *CorruptionErr + if !errors.As(origErr, &cerr) { + return fmt.Errorf("cannot handle error: %w", origErr) } if cerr.Segment < 0 { return errors.New("corruption error does not specify position") @@ -429,7 +427,7 @@ func (w *WL) Repair(origErr error) error { // All segments behind the corruption can no longer be used. segs, err := listSegments(w.Dir()) if err != nil { - return errors.Wrap(err, "list segments") + return fmt.Errorf("list segments: %w", err) } level.Warn(w.logger).Log("msg", "Deleting all segments newer than corrupted segment", "segment", cerr.Segment) @@ -440,14 +438,14 @@ func (w *WL) Repair(origErr error) error { // as we set the current segment to repaired file // below. if err := w.segment.Close(); err != nil { - return errors.Wrap(err, "close active segment") + return fmt.Errorf("close active segment: %w", err) } } if s.index <= cerr.Segment { continue } if err := os.Remove(filepath.Join(w.Dir(), s.name)); err != nil { - return errors.Wrapf(err, "delete segment:%v", s.index) + return fmt.Errorf("delete segment:%v: %w", s.index, err) } } // Regardless of the corruption offset, no record reaches into the previous segment. @@ -472,7 +470,7 @@ func (w *WL) Repair(origErr error) error { f, err := os.Open(tmpfn) if err != nil { - return errors.Wrap(err, "open segment") + return fmt.Errorf("open segment: %w", err) } defer f.Close() @@ -484,24 +482,24 @@ func (w *WL) Repair(origErr error) error { break } if err := w.Log(r.Record()); err != nil { - return errors.Wrap(err, "insert record") + return fmt.Errorf("insert record: %w", err) } } // We expect an error here from r.Err(), so nothing to handle. // We need to pad to the end of the last page in the repaired segment if err := w.flushPage(true); err != nil { - return errors.Wrap(err, "flush page in repair") + return fmt.Errorf("flush page in repair: %w", err) } // We explicitly close even when there is a defer for Windows to be // able to delete it. The defer is in place to close it in-case there // are errors above. if err := f.Close(); err != nil { - return errors.Wrap(err, "close corrupted file") + return fmt.Errorf("close corrupted file: %w", err) } if err := os.Remove(tmpfn); err != nil { - return errors.Wrap(err, "delete corrupted segment") + return fmt.Errorf("delete corrupted segment: %w", err) } // Explicitly close the segment we just repaired to avoid issues with Windows. @@ -553,7 +551,7 @@ func (w *WL) nextSegment(async bool) (int, error) { } next, err := CreateSegment(w.Dir(), w.segment.Index()+1) if err != nil { - return 0, errors.Wrap(err, "create new segment file") + return 0, fmt.Errorf("create new segment file: %w", err) } prev := w.segment if err := w.setSegment(next); err != nil { @@ -940,7 +938,7 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) { for _, sgmRange := range sr { refs, err := listSegments(sgmRange.Dir) if err != nil { - return nil, errors.Wrapf(err, "list segment in dir:%v", sgmRange.Dir) + return nil, fmt.Errorf("list segment in dir:%v: %w", sgmRange.Dir, err) } for _, r := range refs { @@ -952,7 +950,7 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) { } s, err := OpenReadSegment(filepath.Join(sgmRange.Dir, r.name)) if err != nil { - return nil, errors.Wrapf(err, "open segment:%v in dir:%v", r.name, sgmRange.Dir) + return nil, fmt.Errorf("open segment:%v in dir:%v: %w", r.name, sgmRange.Dir, err) } segs = append(segs, s) } @@ -1017,7 +1015,7 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) { r.off += n // If we succeeded, or hit a non-EOF, we can stop. - if err == nil || err != io.EOF { + if err == nil || !errors.Is(err, io.EOF) { return n, err }