tsdb/wlog: use Go standard errors package

Signed-off-by: Matthieu MOREL <matthieu.morel35@gmail.com>
This commit is contained in:
Matthieu MOREL 2023-11-08 21:45:14 +01:00
parent ab2a7bb74f
commit fb48a351f0
5 changed files with 57 additions and 59 deletions

View File

@ -15,6 +15,7 @@
package wlog
import (
"errors"
"fmt"
"io"
"math"
@ -25,7 +26,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -102,8 +102,8 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
{
var sgmRange []SegmentRange
dir, idx, err := LastCheckpoint(w.Dir())
if err != nil && err != record.ErrNotFound {
return nil, errors.Wrap(err, "find last checkpoint")
if err != nil && !errors.Is(err, record.ErrNotFound) {
return nil, fmt.Errorf("find last checkpoint: %w", err)
}
last := idx + 1
if err == nil {
@ -119,7 +119,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
sgmRange = append(sgmRange, SegmentRange{Dir: w.Dir(), First: from, Last: to})
sgmReader, err = NewSegmentsRangeReader(sgmRange...)
if err != nil {
return nil, errors.Wrap(err, "create segment reader")
return nil, fmt.Errorf("create segment reader: %w", err)
}
defer sgmReader.Close()
}
@ -128,15 +128,15 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
cpdirtmp := cpdir + ".tmp"
if err := os.RemoveAll(cpdirtmp); err != nil {
return nil, errors.Wrap(err, "remove previous temporary checkpoint dir")
return nil, fmt.Errorf("remove previous temporary checkpoint dir: %w", err)
}
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir")
return nil, fmt.Errorf("create checkpoint dir: %w", err)
}
cp, err := New(nil, nil, cpdirtmp, w.CompressionType())
if err != nil {
return nil, errors.Wrap(err, "open checkpoint")
return nil, fmt.Errorf("open checkpoint: %w", err)
}
// Ensures that an early return caused by an error doesn't leave any tmp files.
@ -174,7 +174,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
case record.Series:
series, err = dec.Series(rec, series)
if err != nil {
return nil, errors.Wrap(err, "decode series")
return nil, fmt.Errorf("decode series: %w", err)
}
// Drop irrelevant series in place.
repl := series[:0]
@ -192,7 +192,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
case record.Samples:
samples, err = dec.Samples(rec, samples)
if err != nil {
return nil, errors.Wrap(err, "decode samples")
return nil, fmt.Errorf("decode samples: %w", err)
}
// Drop irrelevant samples in place.
repl := samples[:0]
@ -210,7 +210,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
case record.HistogramSamples:
histogramSamples, err = dec.HistogramSamples(rec, histogramSamples)
if err != nil {
return nil, errors.Wrap(err, "decode histogram samples")
return nil, fmt.Errorf("decode histogram samples: %w", err)
}
// Drop irrelevant histogramSamples in place.
repl := histogramSamples[:0]
@ -228,7 +228,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
case record.Tombstones:
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
return nil, errors.Wrap(err, "decode deletes")
return nil, fmt.Errorf("decode deletes: %w", err)
}
// Drop irrelevant tombstones in place.
repl := tstones[:0]
@ -249,7 +249,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
case record.Exemplars:
exemplars, err = dec.Exemplars(rec, exemplars)
if err != nil {
return nil, errors.Wrap(err, "decode exemplars")
return nil, fmt.Errorf("decode exemplars: %w", err)
}
// Drop irrelevant exemplars in place.
repl := exemplars[:0]
@ -266,7 +266,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
case record.Metadata:
metadata, err := dec.Metadata(rec, metadata)
if err != nil {
return nil, errors.Wrap(err, "decode metadata")
return nil, fmt.Errorf("decode metadata: %w", err)
}
// Only keep reference to the latest found metadata for each refID.
repl := 0
@ -292,7 +292,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
// Flush records in 1 MB increments.
if len(buf) > 1*1024*1024 {
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
return nil, fmt.Errorf("flush records: %w", err)
}
buf, recs = buf[:0], recs[:0]
}
@ -300,12 +300,12 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
// If we hit any corruption during checkpointing, repairing is not an option.
// The head won't know which series records are lost.
if r.Err() != nil {
return nil, errors.Wrap(r.Err(), "read segments")
return nil, fmt.Errorf("read segments: %w", r.Err())
}
// Flush remaining records.
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
return nil, fmt.Errorf("flush records: %w", err)
}
// Flush latest metadata records for each series.
@ -315,29 +315,29 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head
latestMetadata = append(latestMetadata, m)
}
if err := cp.Log(enc.Metadata(latestMetadata, buf[:0])); err != nil {
return nil, errors.Wrap(err, "flush metadata records")
return nil, fmt.Errorf("flush metadata records: %w", err)
}
}
if err := cp.Close(); err != nil {
return nil, errors.Wrap(err, "close checkpoint")
return nil, fmt.Errorf("close checkpoint: %w", err)
}
// Sync temporary directory before rename.
df, err := fileutil.OpenDir(cpdirtmp)
if err != nil {
return nil, errors.Wrap(err, "open temporary checkpoint directory")
return nil, fmt.Errorf("open temporary checkpoint directory: %w", err)
}
if err := df.Sync(); err != nil {
df.Close()
return nil, errors.Wrap(err, "sync temporary checkpoint directory")
return nil, fmt.Errorf("sync temporary checkpoint directory: %w", err)
}
if err = df.Close(); err != nil {
return nil, errors.Wrap(err, "close temporary checkpoint directory")
return nil, fmt.Errorf("close temporary checkpoint directory: %w", err)
}
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint directory")
return nil, fmt.Errorf("rename checkpoint directory: %w", err)
}
return stats, nil
@ -364,7 +364,7 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) {
continue
}
if !fi.IsDir() {
return nil, errors.Errorf("checkpoint %s is not a directory", fi.Name())
return nil, fmt.Errorf("checkpoint %s is not a directory", fi.Name())
}
idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil {

View File

@ -23,7 +23,6 @@ import (
"testing"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/histogram"
@ -325,7 +324,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
// Walk the wlog dir to make sure there are no tmp folder left behind after the error.
err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error {
if err != nil {
return errors.Wrapf(err, "access err %q: %v", path, err)
return fmt.Errorf("access err %q: %w", path, err)
}
if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") {
return fmt.Errorf("wlog dir contains temporary folder:%s", info.Name())

View File

@ -16,6 +16,7 @@ package wlog
import (
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
@ -24,7 +25,6 @@ import (
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
@ -135,7 +135,7 @@ func (r *LiveReader) Next() bool {
switch ok, err := r.buildRecord(); {
case ok:
return true
case err != nil && err != io.EOF:
case err != nil && !errors.Is(err, io.EOF):
r.err = err
return false
}
@ -157,7 +157,7 @@ func (r *LiveReader) Next() bool {
if r.writeIndex != pageSize {
n, err := r.fillBuffer()
if n == 0 || (err != nil && err != io.EOF) {
if n == 0 || (err != nil && !errors.Is(err, io.EOF)) {
r.err = err
return false
}
@ -265,7 +265,7 @@ func validateRecord(typ recType, i int) error {
}
return nil
default:
return errors.Errorf("unexpected record type %d", typ)
return fmt.Errorf("unexpected record type %d", typ)
}
}
@ -322,7 +322,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) {
rec := r.buf[r.readIndex+recordHeaderSize : r.readIndex+recordHeaderSize+length]
if c := crc32.Checksum(rec, castagnoliTable); c != crc {
return nil, 0, errors.Errorf("unexpected checksum %x, expected %x", c, crc)
return nil, 0, fmt.Errorf("unexpected checksum %x, expected %x", c, crc)
}
return rec, length + recordHeaderSize, nil

View File

@ -16,12 +16,13 @@ package wlog
import (
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pkg/errors"
)
// Reader reads WAL records from an io.Reader.
@ -47,7 +48,7 @@ func NewReader(r io.Reader) *Reader {
// It must not be called again after it returned false.
func (r *Reader) Next() bool {
err := r.next()
if errors.Is(err, io.EOF) {
if err != nil && errors.Is(err, io.EOF) {
// The last WAL segment record shouldn't be torn(should be full or last).
// The last record would be torn after a crash just before
// the last record part could be persisted to disk.
@ -72,7 +73,7 @@ func (r *Reader) next() (err error) {
i := 0
for {
if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil {
return errors.Wrap(err, "read first header byte")
return fmt.Errorf("read first header byte: %w", err)
}
r.total++
r.curRecTyp = recTypeFromHeader(hdr[0])
@ -95,7 +96,7 @@ func (r *Reader) next() (err error) {
}
n, err := io.ReadFull(r.rdr, buf[:k])
if err != nil {
return errors.Wrap(err, "read remaining zeros")
return fmt.Errorf("read remaining zeros: %w", err)
}
r.total += int64(n)
@ -108,7 +109,7 @@ func (r *Reader) next() (err error) {
}
n, err := io.ReadFull(r.rdr, hdr[1:])
if err != nil {
return errors.Wrap(err, "read remaining header")
return fmt.Errorf("read remaining header: %w", err)
}
r.total += int64(n)
@ -118,7 +119,7 @@ func (r *Reader) next() (err error) {
)
if length > pageSize-recordHeaderSize {
return errors.Errorf("invalid record size %d", length)
return fmt.Errorf("invalid record size %d", length)
}
n, err = io.ReadFull(r.rdr, buf[:length])
if err != nil {
@ -127,10 +128,10 @@ func (r *Reader) next() (err error) {
r.total += int64(n)
if n != int(length) {
return errors.Errorf("invalid size: expected %d, got %d", length, n)
return fmt.Errorf("invalid size: expected %d, got %d", length, n)
}
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
return fmt.Errorf("unexpected checksum %x, expected %x", c, crc)
}
if isSnappyCompressed || isZstdCompressed {

View File

@ -17,6 +17,7 @@ package wlog
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
@ -30,7 +31,6 @@ import (
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
@ -137,7 +137,7 @@ func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error) {
level.Warn(logger).Log("msg", "Last page of the wlog is torn, filling it with zeros", "segment", segName)
if _, err := f.Write(make([]byte, pageSize-d)); err != nil {
f.Close()
return nil, errors.Wrap(err, "zero-pad torn page")
return nil, fmt.Errorf("zero-pad torn page: %w", err)
}
}
return &Segment{SegmentFile: f, i: k, dir: dir}, nil
@ -298,7 +298,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
return nil, errors.New("invalid segment size")
}
if err := os.MkdirAll(dir, 0o777); err != nil {
return nil, errors.Wrap(err, "create dir")
return nil, fmt.Errorf("create dir: %w", err)
}
if logger == nil {
logger = log.NewNopLogger()
@ -331,7 +331,7 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
_, last, err := Segments(w.Dir())
if err != nil {
return nil, errors.Wrap(err, "get segment range")
return nil, fmt.Errorf("get segment range: %w", err)
}
// Index of the Segment we want to open and write to.
@ -414,11 +414,9 @@ func (w *WL) Repair(origErr error) error {
// But that's not generally applicable if the records have any kind of causality.
// Maybe as an extra mode in the future if mid-WAL corruptions become
// a frequent concern.
err := errors.Cause(origErr) // So that we can pick up errors even if wrapped.
cerr, ok := err.(*CorruptionErr)
if !ok {
return errors.Wrap(origErr, "cannot handle error")
var cerr *CorruptionErr
if !errors.As(origErr, &cerr) {
return fmt.Errorf("cannot handle error: %w", origErr)
}
if cerr.Segment < 0 {
return errors.New("corruption error does not specify position")
@ -429,7 +427,7 @@ func (w *WL) Repair(origErr error) error {
// All segments behind the corruption can no longer be used.
segs, err := listSegments(w.Dir())
if err != nil {
return errors.Wrap(err, "list segments")
return fmt.Errorf("list segments: %w", err)
}
level.Warn(w.logger).Log("msg", "Deleting all segments newer than corrupted segment", "segment", cerr.Segment)
@ -440,14 +438,14 @@ func (w *WL) Repair(origErr error) error {
// as we set the current segment to repaired file
// below.
if err := w.segment.Close(); err != nil {
return errors.Wrap(err, "close active segment")
return fmt.Errorf("close active segment: %w", err)
}
}
if s.index <= cerr.Segment {
continue
}
if err := os.Remove(filepath.Join(w.Dir(), s.name)); err != nil {
return errors.Wrapf(err, "delete segment:%v", s.index)
return fmt.Errorf("delete segment:%v: %w", s.index, err)
}
}
// Regardless of the corruption offset, no record reaches into the previous segment.
@ -472,7 +470,7 @@ func (w *WL) Repair(origErr error) error {
f, err := os.Open(tmpfn)
if err != nil {
return errors.Wrap(err, "open segment")
return fmt.Errorf("open segment: %w", err)
}
defer f.Close()
@ -484,24 +482,24 @@ func (w *WL) Repair(origErr error) error {
break
}
if err := w.Log(r.Record()); err != nil {
return errors.Wrap(err, "insert record")
return fmt.Errorf("insert record: %w", err)
}
}
// We expect an error here from r.Err(), so nothing to handle.
// We need to pad to the end of the last page in the repaired segment
if err := w.flushPage(true); err != nil {
return errors.Wrap(err, "flush page in repair")
return fmt.Errorf("flush page in repair: %w", err)
}
// We explicitly close even when there is a defer for Windows to be
// able to delete it. The defer is in place to close it in-case there
// are errors above.
if err := f.Close(); err != nil {
return errors.Wrap(err, "close corrupted file")
return fmt.Errorf("close corrupted file: %w", err)
}
if err := os.Remove(tmpfn); err != nil {
return errors.Wrap(err, "delete corrupted segment")
return fmt.Errorf("delete corrupted segment: %w", err)
}
// Explicitly close the segment we just repaired to avoid issues with Windows.
@ -553,7 +551,7 @@ func (w *WL) nextSegment(async bool) (int, error) {
}
next, err := CreateSegment(w.Dir(), w.segment.Index()+1)
if err != nil {
return 0, errors.Wrap(err, "create new segment file")
return 0, fmt.Errorf("create new segment file: %w", err)
}
prev := w.segment
if err := w.setSegment(next); err != nil {
@ -940,7 +938,7 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) {
for _, sgmRange := range sr {
refs, err := listSegments(sgmRange.Dir)
if err != nil {
return nil, errors.Wrapf(err, "list segment in dir:%v", sgmRange.Dir)
return nil, fmt.Errorf("list segment in dir:%v: %w", sgmRange.Dir, err)
}
for _, r := range refs {
@ -952,7 +950,7 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) {
}
s, err := OpenReadSegment(filepath.Join(sgmRange.Dir, r.name))
if err != nil {
return nil, errors.Wrapf(err, "open segment:%v in dir:%v", r.name, sgmRange.Dir)
return nil, fmt.Errorf("open segment:%v in dir:%v: %w", r.name, sgmRange.Dir, err)
}
segs = append(segs, s)
}
@ -1017,7 +1015,7 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) {
r.off += n
// If we succeeded, or hit a non-EOF, we can stop.
if err == nil || err != io.EOF {
if err == nil || !errors.Is(err, io.EOF) {
return n, err
}