diff --git a/wal.go b/wal.go index 207885d4f..1eed994bf 100644 --- a/wal.go +++ b/wal.go @@ -3,7 +3,6 @@ package tsdb import ( "bufio" "encoding/binary" - "fmt" "hash/crc32" "io" "math" @@ -21,8 +20,13 @@ import ( // WALEntryType indicates what data a WAL entry contains. type WALEntryType byte -// The valid WAL entry types. const ( + WALMagic = 0x43AF00EF + + // Format versioning flag of a WAL segment file. + WALFormatDefault byte = 1 + + // Entry types in a segment file. WALEntrySymbols = 1 WALEntrySeries = 2 WALEntrySamples = 3 @@ -93,9 +97,7 @@ type walHandler struct { // ReadAll consumes all entries in the WAL and triggers the registered handlers. func (w *WAL) ReadAll(h *walHandler) error { - fmt.Println("readall", w.files) for _, f := range w.files { - fmt.Println(" ", f.Name()) dec := newWALDecoder(f, h) for { @@ -151,6 +153,24 @@ func (w *WAL) initSegments() error { } w.files = append(w.files, lf) + // Consume and validate meta headers. + for _, f := range w.files { + metab := make([]byte, 8) + + if n, err := f.Read(metab); err != nil { + return errors.Wrapf(err, "validate meta %q", f.Name()) + } else if n != 8 { + return errors.Errorf("invalid header size %d in %q", n, f.Name()) + } + + if m := binary.BigEndian.Uint32(metab[:4]); m != WALMagic { + return errors.Errorf("invalid magic header %x in %q", m, f.Name()) + } + if metab[4] != WALFormatDefault { + return errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name()) + } + } + return nil } @@ -190,9 +210,18 @@ func (w *WAL) cut() error { return err } + // Write header metadata for new file. + metab := make([]byte, 8) + binary.BigEndian.PutUint32(metab[:4], WALMagic) + metab[4] = WALFormatDefault + + if _, err := f.Write(metab); err != nil { + return err + } + w.files = append(w.files, f) w.cur = bufio.NewWriterSize(f, 4*1024*1024) - w.curN = 0 + w.curN = len(metab) return nil } @@ -391,6 +420,8 @@ type walDecoder struct { buf []byte } +// newWALDecoder returns a new decoder for the default WAL format. The meta +// headers of a segment must already have been consumed. func newWALDecoder(r io.Reader, h *walHandler) *walDecoder { return &walDecoder{ r: r,