Add repair of bad index version

This commit is contained in:
Fabian Reinartz 2018-02-09 13:11:03 +01:00
parent 72d61bcf4e
commit d09283f60a
7 changed files with 128 additions and 24 deletions

View File

@ -195,7 +195,7 @@ func readMetaFile(dir string) (*BlockMeta, error) {
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
if m.Version != 1 && m.Version != 2 {
if m.Version != 1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}
@ -203,6 +203,8 @@ func readMetaFile(dir string) (*BlockMeta, error) {
}
func writeMetaFile(dir string, meta *BlockMeta) error {
meta.Version = 1
// Make any changes to the file appear atomic.
path := filepath.Join(dir, metaFilename)
tmp := path + ".tmp"
@ -253,7 +255,7 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
if err != nil {
return nil, err
}
ir, err := index.NewFileReader(filepath.Join(dir, "index"), meta.Version)
ir, err := index.NewFileReader(filepath.Join(dir, "index"))
if err != nil {
return nil, err
}

View File

@ -431,7 +431,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil {
return errors.Wrap(err, "open index writer")
}
meta.Version = indexw.Version
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")

3
db.go
View File

@ -188,6 +188,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if opts == nil {
opts = DefaultOptions
}
if err := repairBadIndexVersion(dir); err != nil {
return nil, err
}
db = &DB{
dir: dir,

View File

@ -37,6 +37,7 @@ const (
MagicIndex = 0xBAAAD700
indexFormatV1 = 1
indexFormatV2 = 2
)
type indexWriterSeries struct {
@ -168,8 +169,6 @@ func NewWriter(fn string) (*Writer, error) {
symbols: make(map[string]uint32, 1<<13),
seriesOffsets: make(map[uint64]uint64, 1<<16),
crc32: newCRC32(),
Version: 2,
}
if err := iw.writeMeta(); err != nil {
return nil, err
@ -195,7 +194,7 @@ func (w *Writer) write(bufs ...[]byte) error {
return nil
}
// addPadding adds zero byte padding until the file size is a multiple size_unit.
// addPadding adds zero byte padding until the file size is a multiple size.
func (w *Writer) addPadding(size int) error {
p := w.pos % uint64(size)
if p == 0 {
@ -249,7 +248,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
func (w *Writer) writeMeta() error {
w.buf1.reset()
w.buf1.putBE32(MagicIndex)
w.buf1.putByte(indexFormatV1)
w.buf1.putByte(indexFormatV2)
return w.write(w.buf1.get())
}
@ -266,7 +265,13 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
if _, ok := w.seriesOffsets[ref]; ok {
return errors.Errorf("series with reference %d already added", ref)
}
// We add padding to 16 bytes to increase the addressable space we get through 4 byte
// series references.
w.addPadding(16)
if w.pos%16 != 0 {
return errors.Errorf("series write not 16-byte aligned at %d", w.pos)
}
w.seriesOffsets[ref] = w.pos / 16
w.buf2.reset()
@ -573,24 +578,20 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
}
// NewReader returns a new IndexReader on the given byte slice.
func NewReader(b ByteSlice, version int) (*Reader, error) {
return newReader(b, nil, version)
func NewReader(b ByteSlice) (*Reader, error) {
return newReader(b, nil)
}
// NewFileReader returns a new index reader against the given index file.
func NewFileReader(path string, version int) (*Reader, error) {
func NewFileReader(path string) (*Reader, error) {
f, err := fileutil.OpenMmapFile(path)
if err != nil {
return nil, err
}
return newReader(realByteSlice(f.Bytes()), f, version)
return newReader(realByteSlice(f.Bytes()), f)
}
func newReader(b ByteSlice, c io.Closer, version int) (*Reader, error) {
if version != 1 && version != 2 {
return nil, errors.Errorf("unexpected file version %d", version)
}
func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
r := &Reader{
b: b,
c: c,
@ -598,16 +599,20 @@ func newReader(b ByteSlice, c io.Closer, version int) (*Reader, error) {
labels: map[string]uint32{},
postings: map[labels.Label]uint32{},
crc32: newCRC32(),
version: version,
}
// Verify magic number.
if b.Len() < 4 {
// Verify header.
if b.Len() < 5 {
return nil, errors.Wrap(errInvalidSize, "index header")
}
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
return nil, errors.Errorf("invalid magic number %x", m)
}
r.version = int(r.b.Range(4, 5)[0])
if r.version != 1 && r.version != 2 {
return nil, errors.Errorf("unknown index file version %d", r.version)
}
if err := r.readTOC(); err != nil {
return nil, errors.Wrap(err, "read TOC")
@ -880,8 +885,10 @@ func (r *Reader) LabelIndices() ([][]string, error) {
// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version == 2 {
offset = 16 * id
offset = id * 16
}
d := r.decbufUvarintAt(int(offset))
if d.err() != nil {

View File

@ -160,7 +160,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, iw.Close())
ir, err := NewFileReader(fn, 1)
ir, err := NewFileReader(fn)
testutil.Ok(t, err)
testutil.Ok(t, ir.Close())
@ -170,7 +170,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
_, err = f.WriteAt([]byte{0, 0}, 0)
testutil.Ok(t, err)
_, err = NewFileReader(dir, 1)
_, err = NewFileReader(dir)
testutil.NotOk(t, err)
}
@ -213,7 +213,7 @@ func TestIndexRW_Postings(t *testing.T) {
testutil.Ok(t, iw.Close())
ir, err := NewFileReader(fn, 2)
ir, err := NewFileReader(fn)
testutil.Ok(t, err)
p, err := ir.Postings("a", "1")
@ -331,7 +331,7 @@ func TestPersistence_index_e2e(t *testing.T) {
err = iw.Close()
testutil.Ok(t, err)
ir, err := NewFileReader(filepath.Join(dir, "index"), 2)
ir, err := NewFileReader(filepath.Join(dir, "index"))
testutil.Ok(t, err)
for p := range mi.postings {

86
repair.go Normal file
View File

@ -0,0 +1,86 @@
package tsdb
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
)
func repairBadIndexVersion(logger log.Logger, dir string) error {
// All blocks written by Prometheus 2.1 with a meta.json version of 2 are affected.
// We must actually set the index file version to 2 and revert the meta.json version back to 1.
subdirs, err := fileutil.ReadDir(dir)
if err != nil {
return err
}
for _, d := range subdirs {
// Skip non-block dirs.
if _, err := ulid.Parse(d); err != nil {
continue
}
meta, err := readBogusMetaFile(d)
if err != nil {
return err
}
if meta.Version == 1 {
continue
}
level.Info(logger).Log("msg", "fixing broken block", "ulid", meta.ULID)
repl, err := os.Create(filepath.Join(d, "index.repaired"))
if err != nil {
return err
}
broken, err := os.Open(filepath.Join(d, "index"))
if err != nil {
return err
}
if _, err := io.Copy(repl, broken); err != nil {
return err
}
// Set the 5th byte to 2 to indiciate the correct file format version.
if _, err := repl.WriteAt([]byte{2}, 4); err != nil {
return err
}
if err := fileutil.Fsync(repl); err != nil {
return err
}
if err := repl.Close(); err != nil {
return err
}
if err := renameFile(repl.Name(), broken.Name()); err != nil {
return err
}
// Reset version of meta.json to 1.
meta.Version = 1
if err := writeMetaFile(d, meta); err != nil {
return err
}
}
return nil
}
func readBogusMetaFile(dir string) (*BlockMeta, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
if err != nil {
return nil, err
}
var m BlockMeta
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
if m.Version != 1 && m.Version != 2 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}
return &m, nil
}

7
repair_test.go Normal file
View File

@ -0,0 +1,7 @@
package tsdb
import "testing"
func TestRepairBadIndexVersion(t *testing.T) {
}