diff --git a/tsdb/tombstones/tombstones.go b/tsdb/tombstones/tombstones.go index a94ccc5e0..d34a75af4 100644 --- a/tsdb/tombstones/tombstones.go +++ b/tsdb/tombstones/tombstones.go @@ -18,7 +18,6 @@ import ( "fmt" "hash" "hash/crc32" - "io" "io/ioutil" "os" "path/filepath" @@ -98,34 +97,28 @@ func WriteFile(logger log.Logger, dir string, tr Reader) (int64, error) { buf.Reset() // Write the meta. buf.PutBE32(MagicTombstone) - buf.PutByte(tombstoneFormatV1) n, err := f.Write(buf.Get()) if err != nil { return 0, err } size += n - mw := io.MultiWriter(f, hash) - - if err := tr.Iter(func(ref uint64, ivs Intervals) error { - for _, iv := range ivs { - buf.Reset() - - buf.PutUvarint64(ref) - buf.PutVarint64(iv.Mint) - buf.PutVarint64(iv.Maxt) - - n, err = mw.Write(buf.Get()) - if err != nil { - return err - } - size += n - } - return nil - }); err != nil { - return 0, fmt.Errorf("error writing tombstones: %v", err) + bytes, err := Encode(tr) + if err != nil { + return 0, errors.Wrap(err, "encoding tombstones") } + // Ignore first byte which is the format type. We do this for compatibility. + if _, err := hash.Write(bytes[1:]); err != nil { + return 0, errors.Wrap(err, "calculating hash for tombstones") + } + + n, err = f.Write(bytes) + if err != nil { + return 0, errors.Wrap(err, "writing tombstones") + } + size += n + n, err = f.Write(hash.Sum(nil)) if err != nil { return 0, err @@ -145,6 +138,48 @@ func WriteFile(logger log.Logger, dir string, tr Reader) (int64, error) { return int64(size), fileutil.Replace(tmp, path) } +// Encode encodes the tombstones from the reader. +// It does not attach any magic number or checksum. +func Encode(tr Reader) ([]byte, error) { + buf := encoding.Encbuf{} + buf.PutByte(tombstoneFormatV1) + err := tr.Iter(func(ref uint64, ivs Intervals) error { + for _, iv := range ivs { + buf.PutUvarint64(ref) + buf.PutVarint64(iv.Mint) + buf.PutVarint64(iv.Maxt) + } + return nil + }) + return buf.Get(), err +} + +// Decode decodes the tombstones from the bytes +// which was encoded using the Encode method. +func Decode(b []byte) (Reader, error) { + d := &encoding.Decbuf{B: b} + if flag := d.Byte(); flag != tombstoneFormatV1 { + return nil, errors.Errorf("invalid tombstone format %x", flag) + } + + if d.Err() != nil { + return nil, d.Err() + } + + stonesMap := NewMemTombstones() + for d.Len() > 0 { + k := d.Uvarint64() + mint := d.Varint64() + maxt := d.Varint64() + if d.Err() != nil { + return nil, d.Err() + } + + stonesMap.AddInterval(k, Interval{mint, maxt}) + } + return stonesMap, nil +} + // Stone holds the information on the posting and time-range // that is deleted. type Stone struct { @@ -168,34 +203,24 @@ func ReadTombstones(dir string) (Reader, int64, error) { if mg := d.Be32(); mg != MagicTombstone { return nil, 0, fmt.Errorf("invalid magic number %x", mg) } - if flag := d.Byte(); flag != tombstoneFormatV1 { - return nil, 0, fmt.Errorf("invalid tombstone format %x", flag) - } - - if d.Err() != nil { - return nil, 0, d.Err() - } // Verify checksum. hash := newCRC32() - if _, err := hash.Write(d.Get()); err != nil { + // Ignore first byte which is the format type. + if _, err := hash.Write(d.Get()[1:]); err != nil { return nil, 0, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { return nil, 0, errors.New("checksum did not match") } - stonesMap := NewMemTombstones() + if d.Err() != nil { + return nil, 0, d.Err() + } - for d.Len() > 0 { - k := d.Uvarint64() - mint := d.Varint64() - maxt := d.Varint64() - if d.Err() != nil { - return nil, 0, d.Err() - } - - stonesMap.AddInterval(k, Interval{mint, maxt}) + stonesMap, err := Decode(d.Get()) + if err != nil { + return nil, 0, err } return stonesMap, int64(len(b)), nil