From 8e1f97fad42d62e0706f1e52f3016298b6db14f3 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 16 May 2018 08:02:55 -0400 Subject: [PATCH 01/17] wal: add write ahead log package This adds a new WAL that's agnostic to the actual record contents. It's much simpler and should be more resilient than the existing one. Signed-off-by: Fabian Reinartz --- wal/wal.go | 627 ++++++++++++++++++++++++++++++++++++++++++++++++ wal/wal_test.go | 265 ++++++++++++++++++++ 2 files changed, 892 insertions(+) create mode 100644 wal/wal.go create mode 100644 wal/wal_test.go diff --git a/wal/wal.go b/wal/wal.go new file mode 100644 index 0000000000..ed51d1ad90 --- /dev/null +++ b/wal/wal.go @@ -0,0 +1,627 @@ +// Copyright 2017 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bufio" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "os" + "path/filepath" + "strconv" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/fileutil" +) + +const ( + version = 1 + defaultSegmentSize = 128 * 1024 * 1024 // 128 MB + maxRecordSize = 1 * 1024 * 1024 // 1MB + pageSize = 32 * 1024 // 32KB + recordHeaderSize = 7 +) + +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) + +type page struct { + alloc int + flushed int + buf [pageSize]byte +} + +func (p *page) remaining() int { + return pageSize - p.alloc +} + +func (p *page) full() bool { + return pageSize-p.alloc < recordHeaderSize +} + +// WAL is a write ahead log that stores records in segment files. +// Segments are written to in pages of 32KB, with records possibly split +// across page boundaries. +// Records are never split across segments to allow full segments to be +// safely truncated. +// Segments are terminated by one full zero page to allow tailing readers +// to detect segment boundaries. +type WAL struct { + dir string + logger log.Logger + segmentSize int + mtx sync.RWMutex + segment *os.File // active segment + donePages int // pages written to the segment + page *page // active page + stopc chan chan struct{} + actorc chan func() + + fsyncDuration prometheus.Summary + pageFlushes prometheus.Counter + pageCompletions prometheus.Counter +} + +// New returns a new WAL over the given directory. +func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { + return newWAL(logger, reg, dir, defaultSegmentSize) +} + +func newWAL(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { + if segmentSize%pageSize != 0 { + return nil, errors.New("invalid segment size") + } + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, errors.Wrap(err, "create dir") + } + if logger == nil { + logger = log.NewNopLogger() + } + w := &WAL{ + dir: dir, + logger: logger, + segmentSize: segmentSize, + page: &page{}, + actorc: make(chan func(), 100), + stopc: make(chan chan struct{}), + } + w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "prometheus_tsdb_wal_fsync_duration_seconds", + Help: "Duration of WAL fsync.", + }) + w.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_page_flushes_total", + Help: "Total number of page flushes.", + }) + w.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_completed_pages_total", + Help: "Total number of completed pages.", + }) + if reg != nil { + reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions) + } + + _, j, err := w.Segments() + if err != nil { + return nil, err + } + // Fresh dir, no segments yet. + if j == -1 { + w.segment, err = os.OpenFile(SegmentName(w.dir, 0), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + } else { + w.segment, err = os.OpenFile(SegmentName(w.dir, j), os.O_WRONLY|os.O_APPEND, 0666) + } + if err != nil { + return nil, err + } + go w.run() + + return w, nil +} + +// Dir returns the directory of the WAL. +func (w *WAL) Dir() string { + return w.dir +} + +func (w *WAL) run() { + for { + // Processing all pending functions has precedence over shutdown. + select { + case f := <-w.actorc: + f() + default: + } + select { + case f := <-w.actorc: + f() + case donec := <-w.stopc: + close(donec) + return + } + } +} + +// SegmentName builds a segment name for the directory. +func SegmentName(dir string, i int) string { + return filepath.Join(dir, fmt.Sprintf("%06d", i)) +} + +// nextSegment creates the next segment and closes the previous one. +func (w *WAL) nextSegment() error { + if err := w.flushPage(true); err != nil { + return err + } + k, err := strconv.Atoi(filepath.Base(w.segment.Name())) + if err != nil { + return errors.Errorf("current segment %q not numerical", w.segment.Name()) + } + // TODO(fabxc): write initialization page with meta info? + next, err := os.OpenFile(SegmentName(w.dir, k+1), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return errors.Wrap(err, "create new segment file") + } + prev := w.segment + w.segment = next + w.donePages = 0 + + // Don't block further writes by handling the last segment. + // TODO(fabxc): write a termination page as a marker to detect torn segments? + w.actorc <- func() { + if err := w.fsync(prev); err != nil { + level.Error(w.logger).Log("msg", "sync previous segment", "err", err) + } + if err := prev.Close(); err != nil { + level.Error(w.logger).Log("msg", "close previous segment", "err", err) + } + } + return nil +} + +// flushPage writes the new contents of the page to disk. If no more records will fit into +// the page, the remaining bytes will be set to zero and a new page will be started. +// If clear is true, this is enforced regardless of how many bytes are left in the page. +func (w *WAL) flushPage(clear bool) error { + w.pageFlushes.Inc() + + p := w.page + clear = clear || p.full() + + // No more data will fit into the page. Enqueue and clear it. + if clear { + p.alloc = pageSize // write till end of page + w.pageCompletions.Inc() + } + n, err := w.segment.Write(p.buf[p.flushed:p.alloc]) + if err != nil { + return err + } + p.flushed += n + + if clear { + for i := range p.buf { + p.buf[i] = 0 + } + p.alloc = 0 + p.flushed = 0 + w.donePages++ + } + return nil +} + +type recType uint8 + +const ( + recPageTerm recType = 0 // rest of page is empty + recFull recType = 1 // full record + recFirst recType = 2 // first fragment of a record + recMiddle recType = 3 // middle fragments of a record + recLast recType = 4 // final fragment of a record +) + +func (t recType) String() string { + switch t { + case recPageTerm: + return "zero" + case recFull: + return "full" + case recFirst: + return "first" + case recMiddle: + return "middle" + case recLast: + return "last" + default: + return "" + } +} + +func (w *WAL) pagesPerSegment() int { + return w.segmentSize / pageSize +} + +// Log writes the records into the log. +// Multiple records can be passed at once to reduce writes and increase throughput. +func (w *WAL) Log(recs ...[]byte) error { + // Callers could just implement their own list record format but adding + // a bit of extra logic here frees them from that overhead. + for i, r := range recs { + if err := w.log(r, i == len(recs)-1); err != nil { + return err + } + } + return nil +} + +// log writes rec to the log and forces a flush of the current page if its +// the final record of a batch. +func (w *WAL) log(rec []byte, final bool) error { + w.mtx.Lock() + defer w.mtx.Unlock() + + // If the record is too big to fit within pages in the current + // segment, terminate the active segment and advance to the next one. + // This ensures that records do not cross segment boundaries. + left := w.page.remaining() - recordHeaderSize // active page + left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // free pages + + if len(rec) > left { + if err := w.nextSegment(); err != nil { + return err + } + } + + // Populate as many pages as necessary to fit the record. + // Be careful to always do one pass to ensure we write zero-length records. + for i := 0; i == 0 || len(rec) > 0; i++ { + p := w.page + + // Find how much of the record we can fit into the page. + var ( + l = min(len(rec), (pageSize-p.alloc)-recordHeaderSize) + part = rec[:l] + buf = p.buf[p.alloc:] + typ recType + ) + + switch { + case i == 0 && len(part) == len(rec): + typ = recFull + case len(part) == len(rec): + typ = recLast + case i == 0: + typ = recFirst + default: + typ = recMiddle + } + + buf[0] = byte(typ) + crc := crc32.Checksum(part, castagnoliTable) + binary.BigEndian.PutUint16(buf[1:], uint16(len(part))) + binary.BigEndian.PutUint32(buf[3:], crc) + + copy(buf[7:], part) + p.alloc += len(part) + recordHeaderSize + + // If we wrote a full record, we can fit more records of the batch + // into the page before flushing it. + if final || typ != recFull || w.page.full() { + if err := w.flushPage(false); err != nil { + return err + } + } + rec = rec[l:] + } + return nil +} + +// Segments returns the range [m, n] of currently existing segments. +// If no segments are found, m and n are -1. +func (w *WAL) Segments() (m, n int, err error) { + refs, err := listSegments(w.dir) + if err != nil { + return 0, 0, err + } + if len(refs) == 0 { + return -1, -1, nil + } + return refs[0].n, refs[len(refs)-1].n, nil +} + +// Truncate drops all segments before i. +func (w *WAL) Truncate(i int) error { + refs, err := listSegments(w.dir) + if err != nil { + return err + } + for _, r := range refs { + if r.n >= i { + break + } + if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil { + return err + } + } + return nil +} + +func (w *WAL) fsync(f *os.File) error { + start := time.Now() + err := fileutil.Fsync(f) + w.fsyncDuration.Observe(time.Since(start).Seconds()) + return err +} + +// Close flushes all writes and closes active segment. +func (w *WAL) Close() (err error) { + w.mtx.Lock() + defer w.mtx.Unlock() + + // Flush the last page and zero out all its remaining size. + // We must not flush an empty page as it would falsely signal + // the segment is done if we start writing to it again after opening. + if w.page.alloc > 0 { + if err := w.flushPage(true); err != nil { + return err + } + } + + donec := make(chan struct{}) + w.stopc <- donec + <-donec + + if err = w.fsync(w.segment); err != nil { + level.Error(w.logger).Log("msg", "sync previous segment", "err", err) + } + if err := w.segment.Close(); err != nil { + level.Error(w.logger).Log("msg", "close previous segment", "err", err) + } + + return nil +} + +type segmentRef struct { + s string + n int +} + +func listSegments(dir string) (refs []segmentRef, err error) { + files, err := fileutil.ReadDir(dir) + if err != nil { + return nil, err + } + var last int + for _, fn := range files { + k, err := strconv.Atoi(fn) + if err != nil { + continue + } + if len(refs) > 0 && k > last+1 { + return nil, errors.New("segments are not sequential") + } + refs = append(refs, segmentRef{s: fn, n: k}) + last = k + } + return refs, nil +} + +type multiReadCloser struct { + io.Reader + files []*os.File +} + +// NewSegmentsReader returns a new reader over all segments in the directory. +func NewSegmentsReader(dir string) (io.ReadCloser, error) { + refs, err := listSegments(dir) + if err != nil { + return nil, err + } + var rdrs []io.Reader + var files []*os.File + + for _, r := range refs { + f, err := os.Open(filepath.Join(dir, r.s)) + if err != nil { + return nil, err + } + rdrs = append(rdrs, f) + files = append(files, f) + } + return &multiReadCloser{ + Reader: io.MultiReader(rdrs...), + files: files, + }, nil +} + +// NewSegmentsRangeReader returns a new reader over the given WAL segment range. +func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { + refs, err := listSegments(dir) + if err != nil { + return nil, err + } + var rdrs []io.Reader + var files []*os.File + + for _, r := range refs { + if r.n < m { + continue + } + if r.n > n { + break + } + f, err := os.Open(filepath.Join(dir, r.s)) + if err != nil { + return nil, err + } + rdrs = append(rdrs, f) + files = append(files, f) + } + return &multiReadCloser{ + Reader: io.MultiReader(rdrs...), + files: files, + }, nil +} + +func (r *multiReadCloser) Close() (err error) { + for _, s := range r.files { + if e := s.Close(); e != nil { + err = e + } + } + return err +} + +// Reader reads WAL records from an io.Reader. +type Reader struct { + rdr *bufio.Reader + err error + rec []byte + total int // total bytes processed. +} + +// NewReader returns a new reader. +func NewReader(r io.Reader) *Reader { + return &Reader{rdr: bufio.NewReader(r)} +} + +// Next advances the reader to the next records and returns true if it exists. +// It must not be called once after it returned false. +func (r *Reader) Next() bool { + err := r.next() + if err == io.EOF { + return false + } + r.err = err + return r.err == nil +} + +func (r *Reader) next() (err error) { + var hdr [recordHeaderSize]byte + var buf [pageSize]byte + r.rec = r.rec[:0] + + i := 0 + for { + hdr[0], err = r.rdr.ReadByte() + if err != nil { + return err + } + r.total++ + typ := recType(hdr[0]) + + // Gobble up zero bytes. + if typ == recPageTerm { + // We are pedantic and check whether the zeros are actually up + // to a page boundary. + // It's not strictly necessary but may catch sketchy state early. + k := pageSize - (r.total % pageSize) + if k == pageSize { + continue // initial 0 byte was last page byte + } + n, err := io.ReadFull(r.rdr, buf[:k]) + if err != nil { + return err + } + r.total += n + + for _, c := range buf[:k] { + if c != 0 { + return errors.New("unexpected non-zero byte in padded page") + } + } + continue + } + n, err := io.ReadFull(r.rdr, hdr[1:]) + if err != nil { + return err + } + r.total += n + + var ( + length = binary.BigEndian.Uint16(hdr[1:]) + crc = binary.BigEndian.Uint32(hdr[3:]) + ) + + if length > pageSize { + return errors.Errorf("invalid record size %d", length) + } + n, err = io.ReadFull(r.rdr, buf[:length]) + if err != nil { + return err + } + r.total += n + + if n != int(length) { + return errors.Errorf("invalid size: expected %d, got %d", length, n) + } + if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { + return errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + r.rec = append(r.rec, buf[:length]...) + + switch typ { + case recFull: + if i != 0 { + return errors.New("unexpected full record") + } + return nil + case recFirst: + if i != 0 { + return errors.New("unexpected first record") + } + case recMiddle: + if i == 0 { + return errors.New("unexpected middle record") + } + case recLast: + if i == 0 { + return errors.New("unexpected last record") + } + return nil + default: + return errors.Errorf("unexpected record type %d", typ) + } + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + i++ + } +} + +// Err returns the last encountered error. +func (r *Reader) Err() error { + return r.err +} + +// Record returns the current record. The returned byte slice is only +// valid until the next call to Next. +func (r *Reader) Record() []byte { + return r.rec +} + +func min(i, j int) int { + if i < j { + return i + } + return j +} diff --git a/wal/wal_test.go b/wal/wal_test.go new file mode 100644 index 0000000000..913283d1c8 --- /dev/null +++ b/wal/wal_test.go @@ -0,0 +1,265 @@ +// Copyright 2017 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bytes" + "encoding/binary" + "hash/crc32" + "io/ioutil" + "math/rand" + "os" + "testing" + + "github.com/prometheus/tsdb/testutil" +) + +func encodedRecord(t recType, b []byte) []byte { + if t == recPageTerm { + return append([]byte{0}, b...) + } + r := make([]byte, recordHeaderSize) + r[0] = byte(t) + binary.BigEndian.PutUint16(r[1:], uint16(len(b))) + binary.BigEndian.PutUint32(r[3:], crc32.Checksum(b, castagnoliTable)) + return append(r, b...) +} + +// TestReader feeds the reader a stream of encoded records with different types. +func TestReader(t *testing.T) { + data := make([]byte, 100000) + _, err := rand.Read(data) + testutil.Ok(t, err) + + type record struct { + t recType + b []byte + } + cases := []struct { + t []record + exp [][]byte + fail bool + }{ + // Sequence of valid records. + { + t: []record{ + {recFull, data[0:200]}, + {recFirst, data[200:300]}, + {recLast, data[300:400]}, + {recFirst, data[400:800]}, + {recMiddle, data[800:900]}, + {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. + {recLast, data[900:900]}, + {recFirst, data[900:1000]}, + {recMiddle, data[1000:1200]}, + {recMiddle, data[1200:30000]}, + {recMiddle, data[30000:30001]}, + {recMiddle, data[30001:30001]}, + {recLast, data[30001:32000]}, + }, + exp: [][]byte{ + data[0:200], + data[200:400], + data[400:900], + data[900:32000], + }, + }, + // Exactly at the limit of one page minus the header size + { + t: []record{ + {recFull, data[0 : pageSize-recordHeaderSize]}, + }, + exp: [][]byte{ + data[:pageSize-recordHeaderSize], + }, + }, + // More than a full page, this exceeds our buffer and can never happen + // when written by the WAL. + { + t: []record{ + {recFull, data[0 : pageSize+1]}, + }, + fail: true, + }, + // Invalid orders of record types. + { + t: []record{{recMiddle, data[:200]}}, + fail: true, + }, + { + t: []record{{recLast, data[:200]}}, + fail: true, + }, + { + t: []record{ + {recFirst, data[:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + { + t: []record{ + {recFirst, data[:100]}, + {recMiddle, data[100:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + // Non-zero data after page termination. + { + t: []record{ + {recFull, data[:100]}, + {recPageTerm, append(make([]byte, 1000), 1)}, + }, + exp: [][]byte{data[:100]}, + fail: true, + }, + } + for i, c := range cases { + t.Logf("test %d", i) + + var buf []byte + for _, r := range c.t { + buf = append(buf, encodedRecord(r.t, r.b)...) + } + r := NewReader(bytes.NewReader(buf)) + + for j := 0; r.Next(); j++ { + t.Logf("record %d", j) + rec := r.Record() + + if j >= len(c.exp) { + t.Fatal("received more records than inserted") + } + testutil.Equals(t, c.exp[j], rec) + } + if !c.fail && r.Err() != nil { + t.Fatalf("unexpected error: %s", r.Err()) + } + if c.fail && r.Err() == nil { + t.Fatalf("expected error but got none") + } + } +} + +func TestWAL_FuzzWriteRead(t *testing.T) { + const count = 25000 + + dir, err := ioutil.TempDir("", "walfuzz") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := newWAL(nil, nil, dir, 128*pageSize) + testutil.Ok(t, err) + + var input [][]byte + var recs [][]byte + + for i := 0; i < count; i++ { + var sz int + switch i % 5 { + case 0, 1: + sz = 50 + case 2, 3: + sz = pageSize + default: + sz = 8 * pageSize + } + rec := make([]byte, rand.Intn(sz)) + _, err := rand.Read(rec) + testutil.Ok(t, err) + + input = append(input, rec) + recs = append(recs, rec) + + // Randomly batch up records. + if rand.Intn(4) < 3 { + testutil.Ok(t, w.Log(recs...)) + recs = recs[:0] + } + } + testutil.Ok(t, w.Log(recs...)) + + m, n, err := w.Segments() + testutil.Ok(t, err) + + rc, err := NewSegmentsRangeReader(dir, m, n) + testutil.Ok(t, err) + defer rc.Close() + + rdr := NewReader(rc) + + for i := 0; rdr.Next(); i++ { + rec := rdr.Record() + if i >= len(input) { + t.Fatal("read too many records") + } + if !bytes.Equal(input[i], rec) { + t.Fatalf("record %d (len %d) does not match (expected len %d)", + i, len(rec), len(input[i])) + } + } + testutil.Ok(t, rdr.Err()) +} + +func BenchmarkWAL_LogBatched(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_logbatch") + testutil.Ok(b, err) + defer os.RemoveAll(dir) + + w, err := New(nil, nil, "testdir") + testutil.Ok(b, err) + defer w.Close() + + var buf [2048]byte + var recs [][]byte + b.SetBytes(2048) + + for i := 0; i < b.N; i++ { + recs = append(recs, buf[:]) + if len(recs) < 1000 { + continue + } + err := w.Log(recs...) + testutil.Ok(b, err) + recs = recs[:0] + } + // Stop timer to not count fsync time on close. + // If it's counted batched vs. single benchmarks are very similar but + // do not show burst throughput well. + b.StopTimer() +} + +func BenchmarkWAL_Log(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_logsingle") + testutil.Ok(b, err) + defer os.RemoveAll(dir) + + w, err := New(nil, nil, "testdir") + testutil.Ok(b, err) + defer w.Close() + + var buf [2048]byte + b.SetBytes(2048) + + for i := 0; i < b.N; i++ { + err := w.Log(buf[:]) + testutil.Ok(b, err) + } + // Stop timer to not count fsync time on close. + // If it's counted batched vs. single benchmarks are very similar but + // do not show burst throughput well. + b.StopTimer() +} From 449a2d0db7e3efd32aa748ca31ecc1044496ccd2 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 17 May 2018 09:00:32 -0400 Subject: [PATCH 02/17] wal: add segment type and repair procedure Allow to repair the WAL based on the error returned by a reader during a full scan over all records. Signed-off-by: Fabian Reinartz --- wal/wal.go | 364 +++++++++++++++++++++++++++++++++++++----------- wal/wal_test.go | 98 ++++++++++++- 2 files changed, 378 insertions(+), 84 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index ed51d1ad90..ed979998c5 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -20,6 +20,7 @@ import ( "fmt" "hash/crc32" "io" + "math" "os" "path/filepath" "strconv" @@ -60,19 +61,101 @@ func (p *page) full() bool { return pageSize-p.alloc < recordHeaderSize } +// Segment represents a segment file. +type Segment struct { + *os.File + dir string + i int +} + +// Index returns the index of the segment. +func (s *Segment) Index() int { + return s.i +} + +// Dir returns the directory of the segment. +func (s *Segment) Dir() string { + return s.dir +} + +// CorruptionErr is an error that's returned when corruption is encountered. +type CorruptionErr struct { + Segment int + Offset int + Err error +} + +func (e *CorruptionErr) Error() string { + if e.Segment < 0 { + return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err) + } + return fmt.Sprintf("corruption in segment %d at %d: %s", e.Segment, e.Offset, e.Err) +} + +// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. +func OpenWriteSegment(dir string, k int) (*Segment, error) { + // Only .active segments are allowed to be opened for write. + f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + stat, err := f.Stat() + if err != nil { + f.Close() + return nil, err + } + // If the last page is torn, fill it with zeros. + // In case it was torn after all records were written successfully, this + // will just pad the page and everything will be fine. + // If it was torn mid-record, a full read (which the caller should do anyway + // to ensure integrity) will detect it as a corruption by the end. + if d := stat.Size() % pageSize; d != 0 { + if _, err := f.Write(make([]byte, pageSize-d)); err != nil { + f.Close() + return nil, errors.Wrap(err, "zero-pad torn page") + } + } + return &Segment{File: f, i: k, dir: dir}, nil +} + +// CreateSegment creates a new segment k in dir. +func CreateSegment(dir string, k int) (*Segment, error) { + f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + return &Segment{File: f, i: k, dir: dir}, nil +} + +// OpenReadSegment opens the segment k in dir for reading. +func OpenReadSegment(fn string) (*Segment, error) { + k, err := strconv.Atoi(filepath.Base(fn)) + if err != nil { + return nil, errors.New("not a valid filename") + } + f, err := os.Open(fn) + if err != nil { + return nil, err + } + return &Segment{File: f, i: k, dir: filepath.Dir(fn)}, nil +} + // WAL is a write ahead log that stores records in segment files. +// It must be read from start to end once before logging new data. +// If an errore occurs during read, the repair procedure must be called +// before it's safe to do further writes. +// // Segments are written to in pages of 32KB, with records possibly split // across page boundaries. // Records are never split across segments to allow full segments to be -// safely truncated. -// Segments are terminated by one full zero page to allow tailing readers -// to detect segment boundaries. +// safely truncated. It also ensures that torn writes never corrupt records +// beyond the most recent segment. type WAL struct { dir string logger log.Logger segmentSize int mtx sync.RWMutex - segment *os.File // active segment + segment *Segment // active segment donePages int // pages written to the segment page *page // active page stopc chan chan struct{} @@ -85,10 +168,12 @@ type WAL struct { // New returns a new WAL over the given directory. func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { - return newWAL(logger, reg, dir, defaultSegmentSize) + return NewSize(logger, reg, dir, defaultSegmentSize) } -func newWAL(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { +// NewSize returns a new WAL over the given directory. +// New segments are created with the specified size. +func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { if segmentSize%pageSize != 0 { return nil, errors.New("invalid segment size") } @@ -124,16 +209,23 @@ func newWAL(logger log.Logger, reg prometheus.Registerer, dir string, segmentSiz _, j, err := w.Segments() if err != nil { - return nil, err + return nil, errors.Wrap(err, "get segment range") } // Fresh dir, no segments yet. if j == -1 { - w.segment, err = os.OpenFile(SegmentName(w.dir, 0), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if w.segment, err = CreateSegment(w.dir, 0); err != nil { + return nil, err + } } else { - w.segment, err = os.OpenFile(SegmentName(w.dir, j), os.O_WRONLY|os.O_APPEND, 0666) - } - if err != nil { - return nil, err + if w.segment, err = OpenWriteSegment(w.dir, j); err != nil { + return nil, err + } + // Correctly initialize donePages. + stat, err := w.segment.Stat() + if err != nil { + return nil, err + } + w.donePages = int(stat.Size() / pageSize) } go w.run() @@ -146,23 +238,99 @@ func (w *WAL) Dir() string { } func (w *WAL) run() { +Loop: for { - // Processing all pending functions has precedence over shutdown. - select { - case f := <-w.actorc: - f() - default: - } select { case f := <-w.actorc: f() case donec := <-w.stopc: - close(donec) + defer close(donec) + break Loop + } + } + // Drain and process any remaining functions. + for { + select { + case f := <-w.actorc: + f() + default: return } } } +// Repair attempts to repair the WAL based on the error. +// It discards all data behind the corruption +func (w *WAL) Repair(err error) error { + // We could probably have a mode that only discards torn records right around + // the corruption to preserve as data much as possible. + // But that's not generally applicable if the records have any kind of causality. + // Maybe as an extra mode in the future if mid-WAL corruptions become + // a frequent concern. + cerr, ok := err.(*CorruptionErr) + if !ok { + return errors.New("cannot handle error") + } + if cerr.Segment < 0 { + return errors.New("corruption error does not specify position") + } + + level.Warn(w.logger).Log("msg", "starting corruption repair", + "segment", cerr.Segment, "offset", cerr.Offset) + + // All segments behind the corruption can no longer be used. + segs, err := listSegments(w.dir) + if err != nil { + return errors.Wrap(err, "list segments") + } + level.Warn(w.logger).Log("msg", "deleting all segments behind corruption") + + for _, s := range segs { + if s.n <= cerr.Segment { + continue + } + if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { + return errors.Wrap(err, "delete segment") + } + } + // Regardless of the corruption offset, no record reaches into the previous segment. + // So we can safely repair the WAL by removing the segment and re-inserting all + // its records up to the corruption. + level.Warn(w.logger).Log("msg", "rewrite corrupted segment") + + fn := SegmentName(w.dir, cerr.Segment) + tmpfn := fn + ".repair" + + if err := fileutil.Rename(fn, tmpfn); err != nil { + return err + } + // Create a clean segment and make it the active one. + s, err := CreateSegment(w.dir, cerr.Segment) + if err != nil { + return err + } + w.segment = s + + f, err := os.Open(tmpfn) + if err != nil { + return errors.Wrap(err, "open segment") + } + defer f.Close() + r := NewReader(bufio.NewReader(f)) + + for r.Next() { + if err := w.Log(r.Record()); err != nil { + return errors.Wrap(err, "insert record") + } + } + // We expect an error here, so nothing to handle. + + if err := os.Remove(tmpfn); err != nil { + return errors.Wrap(err, "delete corrupted segment") + } + return nil +} + // SegmentName builds a segment name for the directory. func SegmentName(dir string, i int) string { return filepath.Join(dir, fmt.Sprintf("%06d", i)) @@ -170,15 +338,13 @@ func SegmentName(dir string, i int) string { // nextSegment creates the next segment and closes the previous one. func (w *WAL) nextSegment() error { - if err := w.flushPage(true); err != nil { - return err + // Only flush the current page if it actually holds data. + if w.page.alloc > 0 { + if err := w.flushPage(true); err != nil { + return err + } } - k, err := strconv.Atoi(filepath.Base(w.segment.Name())) - if err != nil { - return errors.Errorf("current segment %q not numerical", w.segment.Name()) - } - // TODO(fabxc): write initialization page with meta info? - next, err := os.OpenFile(SegmentName(w.dir, k+1), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + next, err := CreateSegment(w.dir, w.segment.Index()+1) if err != nil { return errors.Wrap(err, "create new segment file") } @@ -186,8 +352,7 @@ func (w *WAL) nextSegment() error { w.segment = next w.donePages = 0 - // Don't block further writes by handling the last segment. - // TODO(fabxc): write a termination page as a marker to detect torn segments? + // Don't block further writes by fsyncing the last segment. w.actorc <- func() { if err := w.fsync(prev); err != nil { level.Error(w.logger).Log("msg", "sync previous segment", "err", err) @@ -366,9 +531,9 @@ func (w *WAL) Truncate(i int) error { return nil } -func (w *WAL) fsync(f *os.File) error { +func (w *WAL) fsync(f *Segment) error { start := time.Now() - err := fileutil.Fsync(f) + err := fileutil.Fsync(f.File) w.fsyncDuration.Observe(time.Since(start).Seconds()) return err } @@ -426,65 +591,65 @@ func listSegments(dir string) (refs []segmentRef, err error) { return refs, nil } -type multiReadCloser struct { - io.Reader - files []*os.File -} - // NewSegmentsReader returns a new reader over all segments in the directory. func NewSegmentsReader(dir string) (io.ReadCloser, error) { - refs, err := listSegments(dir) - if err != nil { - return nil, err - } - var rdrs []io.Reader - var files []*os.File - - for _, r := range refs { - f, err := os.Open(filepath.Join(dir, r.s)) - if err != nil { - return nil, err - } - rdrs = append(rdrs, f) - files = append(files, f) - } - return &multiReadCloser{ - Reader: io.MultiReader(rdrs...), - files: files, - }, nil + return NewSegmentsRangeReader(dir, 0, math.MaxInt64) } // NewSegmentsRangeReader returns a new reader over the given WAL segment range. +// If m or n are -1, the range is open on the respective end. func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { refs, err := listSegments(dir) if err != nil { return nil, err } - var rdrs []io.Reader - var files []*os.File + var segs []*Segment for _, r := range refs { - if r.n < m { + if m >= 0 && r.n < m { continue } - if r.n > n { + if n >= 0 && r.n > n { break } - f, err := os.Open(filepath.Join(dir, r.s)) + s, err := OpenReadSegment(filepath.Join(dir, r.s)) if err != nil { return nil, err } - rdrs = append(rdrs, f) - files = append(files, f) + segs = append(segs, s) } - return &multiReadCloser{ - Reader: io.MultiReader(rdrs...), - files: files, - }, nil + return newSegmentBufReader(segs...), nil } -func (r *multiReadCloser) Close() (err error) { - for _, s := range r.files { +// Reader reads WAL records from an io.Reader. +type Reader struct { + rdr io.Reader + err error + rec []byte + total int // total bytes processed. +} + +// segmentBufReader is a buffered reader that reads in multiples of pages. +// The main purpose is that we are able to track segment and offset for +// corruption reporting. +type segmentBufReader struct { + buf *bufio.Reader + segs []*Segment + cur int + off int + more bool +} + +func newSegmentBufReader(segs ...*Segment) *segmentBufReader { + return &segmentBufReader{ + buf: bufio.NewReaderSize(nil, 16*pageSize), + segs: segs, + cur: -1, + } +} + +func (r *segmentBufReader) Close() (err error) { + for _, s := range r.segs { if e := s.Close(); e != nil { err = e } @@ -492,24 +657,42 @@ func (r *multiReadCloser) Close() (err error) { return err } -// Reader reads WAL records from an io.Reader. -type Reader struct { - rdr *bufio.Reader - err error - rec []byte - total int // total bytes processed. +func (r *segmentBufReader) Read(b []byte) (n int, err error) { + if !r.more { + if r.cur+1 >= len(r.segs) { + return 0, io.EOF + } + r.cur++ + r.off = 0 + r.more = true + r.buf.Reset(r.segs[r.cur]) + } + n, err = r.buf.Read(b) + r.off += n + if err != io.EOF { + return n, err + } + // Just return what we read so far, but don't signal EOF. + // Only unset more so we don't invalidate the current segment and + // offset before the next read. + r.more = false + // If no more segments are left, it's the end for the reader. + if len(r.segs) == 0 { + return n, io.EOF + } + return n, nil } // NewReader returns a new reader. func NewReader(r io.Reader) *Reader { - return &Reader{rdr: bufio.NewReader(r)} + return &Reader{rdr: r} } // Next advances the reader to the next records and returns true if it exists. // It must not be called once after it returned false. func (r *Reader) Next() bool { err := r.next() - if err == io.EOF { + if errors.Cause(err) == io.EOF { return false } r.err = err @@ -523,9 +706,8 @@ func (r *Reader) next() (err error) { i := 0 for { - hdr[0], err = r.rdr.ReadByte() - if err != nil { - return err + if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil { + return errors.Wrap(err, "read first header byte") } r.total++ typ := recType(hdr[0]) @@ -541,7 +723,7 @@ func (r *Reader) next() (err error) { } n, err := io.ReadFull(r.rdr, buf[:k]) if err != nil { - return err + return errors.Wrap(err, "read remaining zeros") } r.total += n @@ -554,7 +736,7 @@ func (r *Reader) next() (err error) { } n, err := io.ReadFull(r.rdr, hdr[1:]) if err != nil { - return err + return errors.Wrap(err, "read remaining header") } r.total += n @@ -608,9 +790,25 @@ func (r *Reader) next() (err error) { } } -// Err returns the last encountered error. +// Err returns the last encountered error wrapped in a corruption error. +// If the reader does not allow to infer a segment index and offset, a total +// offset in the reader stream will be provided. func (r *Reader) Err() error { - return r.err + if r.err == nil { + return nil + } + if b, ok := r.rdr.(*segmentBufReader); ok { + return &CorruptionErr{ + Err: r.err, + Segment: b.segs[b.cur].Index(), + Offset: b.off, + } + } + return &CorruptionErr{ + Err: r.err, + Segment: -1, + Offset: r.total, + } } // Record returns the current record. The returned byte slice is only diff --git a/wal/wal_test.go b/wal/wal_test.go index 913283d1c8..d1b724c7ed 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -161,7 +161,7 @@ func TestWAL_FuzzWriteRead(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - w, err := newWAL(nil, nil, dir, 128*pageSize) + w, err := NewSize(nil, nil, dir, 128*pageSize) testutil.Ok(t, err) var input [][]byte @@ -214,6 +214,102 @@ func TestWAL_FuzzWriteRead(t *testing.T) { testutil.Ok(t, rdr.Err()) } +func TestWAL_Repair(t *testing.T) { + for name, cf := range map[string]func(f *os.File){ + "bad_fragment_sequence": func(f *os.File) { + _, err := f.Seek(pageSize, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{byte(recLast)}) + testutil.Ok(t, err) + }, + "bad_fragment_flag": func(f *os.File) { + _, err := f.Seek(pageSize, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{123}) + testutil.Ok(t, err) + }, + "bad_checksum": func(f *os.File) { + _, err := f.Seek(pageSize+4, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + }, + "bad_length": func(f *os.File) { + _, err := f.Seek(pageSize+2, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + }, + "bad_content": func(f *os.File) { + _, err := f.Seek(pageSize+100, 0) + testutil.Ok(t, err) + _, err = f.Write([]byte("beef")) + testutil.Ok(t, err) + }, + } { + t.Run(name, func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_repair") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + // We create 3 segments with 3 records each and then corrupt the 2nd record + // of the 2nd segment. + // As a result we want a repaired WAL with the first 4 records intact. + w, err := NewSize(nil, nil, dir, 3*pageSize) + testutil.Ok(t, err) + + var records [][]byte + + for i := 1; i <= 9; i++ { + b := make([]byte, pageSize-recordHeaderSize) + b[0] = byte(i) + records = append(records, b) + testutil.Ok(t, w.Log(b)) + } + testutil.Ok(t, w.Close()) + + f, err := os.OpenFile(SegmentName(dir, 1), os.O_RDWR, 0666) + testutil.Ok(t, err) + + // Apply corruption function. + cf(f) + + testutil.Ok(t, f.Close()) + + w, err = New(nil, nil, dir) + testutil.Ok(t, err) + + sr, err := NewSegmentsReader(dir) + testutil.Ok(t, err) + r := NewReader(sr) + + for r.Next() { + } + testutil.NotOk(t, r.Err()) + + testutil.Ok(t, w.Repair(r.Err())) + + sr, err = NewSegmentsReader(dir) + testutil.Ok(t, err) + r = NewReader(sr) + + var result [][]byte + for r.Next() { + var b []byte + result = append(result, append(b, r.Record()...)) + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, 4, len(result)) + + for i, r := range result { + if !bytes.Equal(records[i], r) { + t.Fatalf("record %d diverges: want %x, got %x", i, records[i][:10], r[:10]) + } + } + }) + } +} + func BenchmarkWAL_LogBatched(b *testing.B) { dir, err := ioutil.TempDir("", "bench_logbatch") testutil.Ok(b, err) From 008399a6e04575745fc52c81b0b1db9abd569f35 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 17 May 2018 09:02:47 -0400 Subject: [PATCH 03/17] Add checkpointing of WAL segments Create checkpoints from a sequence of WAL segments while filtering out obsolete data. The checkpoint format is again a sequence of WAL segments, which allows us to reuse the serialization format and implementation. Signed-off-by: Fabian Reinartz --- checkpoint.go | 279 +++++++++++++++++++++++++++++++++++++++++++ checkpoint_test.go | 182 ++++++++++++++++++++++++++++ fileutil/fileutil.go | 23 ++++ record.go | 213 +++++++++++++++++++++++++++++++++ record_test.go | 73 +++++++++++ 5 files changed, 770 insertions(+) create mode 100644 checkpoint.go create mode 100644 checkpoint_test.go create mode 100644 record.go create mode 100644 record_test.go diff --git a/checkpoint.go b/checkpoint.go new file mode 100644 index 0000000000..2ab5f8d95c --- /dev/null +++ b/checkpoint.go @@ -0,0 +1,279 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/wal" +) + +// CheckpointStats returns stats about a created checkpoint. +type CheckpointStats struct { + DroppedSeries int + DroppedSamples int + DroppedTombstones int + TotalSeries int + TotalSamples int + TotalTombstones int +} + +// LastCheckpoint returns the directory name of the most recent checkpoint. +// If dir does not contain any checkpoints, ErrNotFound is returned. +func LastCheckpoint(dir string) (string, int, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return "", 0, err + } + // Traverse list backwards since there may be multiple checkpoints left. + for i := len(files) - 1; i >= 0; i-- { + fi := files[i] + + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + continue + } + if !fi.IsDir() { + return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name()) + } + k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil { + continue + } + return fi.Name(), k, nil + } + return "", 0, ErrNotFound +} + +// DeleteCheckpoints deletes all checkpoints in dir that have an index +// below n. +func DeleteCheckpoints(dir string, n int) error { + var errs MultiError + + files, err := ioutil.ReadDir(dir) + if err != nil { + return err + } + for _, fi := range files { + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + continue + } + k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil || k >= n { + continue + } + if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { + errs.Add(err) + } + } + return errs.Err() +} + +const checkpointPrefix = "checkpoint." + +// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL. +// It includes the most recent checkpoint if it exists. +// All series not satisfying keep and samples below mint are dropped. +// +// The checkpoint is stored in a directory named checkpoint.N in the same +// segmented format as the original WAL itself. +// This makes it easy to read it through the WAL package and concatenate +// it with the original WAL. +// +// Non-critical errors are logged and not returned. +func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { + if logger == nil { + logger = log.NewNopLogger() + } + stats := &CheckpointStats{} + + var sr io.Reader + { + lastFn, k, err := LastCheckpoint(w.Dir()) + if err != nil && err != ErrNotFound { + return nil, errors.Wrap(err, "find last checkpoint") + } + if err == nil { + if m > k+1 { + return nil, errors.New("unexpected gap to last checkpoint") + } + // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. + m = k + 1 + + last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn)) + if err != nil { + return nil, errors.Wrap(err, "open last checkpoint") + } + defer last.Close() + sr = last + } + + segs, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) + if err != nil { + return nil, errors.Wrap(err, "create segment reader") + } + defer segs.Close() + + if sr != nil { + sr = io.MultiReader(sr, segs) + } else { + sr = segs + } + } + + cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n)) + cpdirtmp := cpdir + ".tmp" + + if err := os.MkdirAll(cpdirtmp, 0777); err != nil { + return nil, errors.Wrap(err, "create checkpoint dir") + } + cp, err := wal.New(nil, nil, cpdirtmp) + if err != nil { + return nil, errors.Wrap(err, "open checkpoint") + } + + r := wal.NewReader(sr) + + var ( + series []RefSeries + samples []RefSample + tstones []Stone + dec RecordDecoder + enc RecordEncoder + buf []byte + recs [][]byte + ) + for r.Next() { + series, samples, tstones = series[:0], samples[:0], tstones[:0] + + // We don't reset the buffer since we batch up multiple records + // before writing them to the checkpoint. + // Remember where the record for this iteration starts. + start := len(buf) + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err = dec.Series(rec, series) + if err != nil { + return nil, errors.Wrap(err, "decode series") + } + // Drop irrelevant series in place. + repl := series[:0] + for _, s := range series { + if keep(s.Ref) { + repl = append(repl, s) + } + } + if len(repl) > 0 { + buf = enc.Series(repl, buf) + } + stats.TotalSeries += len(series) + stats.DroppedSeries += len(series) - len(repl) + + case RecordSamples: + samples, err = dec.Samples(rec, samples) + if err != nil { + return nil, errors.Wrap(err, "decode samples") + } + // Drop irrelevant samples in place. + repl := samples[:0] + for _, s := range samples { + if s.T >= mint { + repl = append(repl, s) + } + } + if len(repl) > 0 { + buf = enc.Samples(repl, buf) + } + stats.TotalSamples += len(samples) + stats.DroppedSamples += len(samples) - len(repl) + + case RecordTombstones: + tstones, err = dec.Tombstones(rec, tstones) + if err != nil { + return nil, errors.Wrap(err, "decode deletes") + } + // Drop irrelevant tombstones in place. + repl := tstones[:0] + for _, s := range tstones { + for _, iv := range s.intervals { + if iv.Maxt >= mint { + repl = append(repl, s) + break + } + } + } + if len(repl) > 0 { + buf = enc.Tombstones(repl, buf) + } + stats.TotalTombstones += len(tstones) + stats.DroppedTombstones += len(tstones) - len(repl) + + default: + return nil, errors.New("invalid record type") + } + if len(buf[start:]) == 0 { + continue // All contents discarded. + } + recs = append(recs, buf[start:]) + + // Flush records in 1 MB increments. + if len(buf) > 1*1024*1024 { + if err := cp.Log(recs...); err != nil { + return nil, errors.Wrap(err, "flush records") + } + buf, recs = buf[:0], recs[:0] + } + } + // If we hit any corruption during checkpointing, repairing is not an option. + // The head won't know which series records are lost. + if r.Err() != nil { + return nil, errors.Wrap(r.Err(), "read segments") + } + + // Flush remaining records. + if err := cp.Log(recs...); err != nil { + return nil, errors.Wrap(err, "flush records") + } + if err := cp.Close(); err != nil { + return nil, errors.Wrap(err, "close checkpoint") + } + if err := fileutil.Rename(cpdirtmp, cpdir); err != nil { + return nil, errors.Wrap(err, "rename checkpoint file") + } + if err := w.Truncate(n + 1); err != nil { + // If truncating fails, we'll just try again at the next checkpoint. + // Leftover segments will just be ignored in the future if there's a checkpoint + // that supersedes them. + level.Error(logger).Log("msg", "truncating segments failed", "err", err) + } + if err := DeleteCheckpoints(w.Dir(), n); err != nil { + // Leftover old checkpoints do not cause problems down the line beyond + // occupying disk space. + // They will just be ignored since a higher checkpoint exists. + level.Error(logger).Log("msg", "delete old checkpoints", "err", err) + } + return stats, nil +} diff --git a/checkpoint_test.go b/checkpoint_test.go new file mode 100644 index 0000000000..074bb46e8c --- /dev/null +++ b/checkpoint_test.go @@ -0,0 +1,182 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" +) + +func TestLastCheckpoint(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s, k, err := LastCheckpoint(dir) + testutil.Equals(t, ErrNotFound, err) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.0000", s) + testutil.Equals(t, 0, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.xyz"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.0000", s) + testutil.Equals(t, 0, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.1", s) + testutil.Equals(t, 1, k) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.1000"), 0777)) + s, k, err = LastCheckpoint(dir) + testutil.Ok(t, err) + testutil.Equals(t, "checkpoint.1000", s) + testutil.Equals(t, 1000, k) +} + +func TestDeleteCheckpoints(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + testutil.Ok(t, DeleteCheckpoints(dir, 0)) + + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.00"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.01"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.02"), 0777)) + testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.03"), 0777)) + + testutil.Ok(t, DeleteCheckpoints(dir, 2)) + + files, err := fileutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, []string{"checkpoint.02", "checkpoint.03"}, files) +} + +func TestCheckpoint(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + fmt.Println(dir) + + var enc RecordEncoder + // Create a dummy segment to bump the initial number. + seg, err := wal.CreateSegment(dir, 100) + testutil.Ok(t, err) + testutil.Ok(t, seg.Close()) + + // Manually create checkpoint for 99 and earlier. + w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099")) + testutil.Ok(t, err) + + // Add some data we expect to be around later. + err = w.Log(enc.Series([]RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, + }, nil)) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + // Start a WAL and write records to it as usual. + w, err = wal.NewSize(nil, nil, dir, 64*1024) + testutil.Ok(t, err) + + var last int64 + for i := 0; ; i++ { + _, n, err := w.Segments() + testutil.Ok(t, err) + if n >= 106 { + break + } + // Write some series initially. + if i == 0 { + b := enc.Series([]RefSeries{ + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + {Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")}, + }, nil) + testutil.Ok(t, w.Log(b)) + } + // Write samples until the WAL has enough segments. + // Make them have drifting timestamps within a record to see that they + // get filtered properly. + b := enc.Samples([]RefSample{ + {Ref: 0, T: last, V: float64(i)}, + {Ref: 1, T: last + 10000, V: float64(i)}, + {Ref: 2, T: last + 20000, V: float64(i)}, + {Ref: 3, T: last + 30000, V: float64(i)}, + }, nil) + testutil.Ok(t, w.Log(b)) + + last += 100 + } + testutil.Ok(t, w.Close()) + + stats, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { + return x%2 == 0 + }, last/2) + testutil.Ok(t, err) + testutil.Equals(t, 106, stats.HighSegment) + + // Only the new checkpoint should be left. + files, err := fileutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, 1, len(files)) + testutil.Equals(t, "checkpoint.000106", files[0]) + + sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) + testutil.Ok(t, err) + defer sr.Close() + + var dec RecordDecoder + var series []RefSeries + r := wal.NewReader(sr) + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err = dec.Series(rec, series) + testutil.Ok(t, err) + case RecordSamples: + samples, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + for _, s := range samples { + testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp") + } + } + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, []RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + }, series) +} diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index c2c25842ad..397858958b 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -6,6 +6,7 @@ package fileutil import ( "os" + "path/filepath" "sort" ) @@ -23,3 +24,25 @@ func ReadDir(dirpath string) ([]string, error) { sort.Strings(names) return names, nil } + +// Rename safely renames a file. +func Rename(from, to string) error { + if err := os.RemoveAll(to); err != nil { + return err + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = Fsync(pdir); err != nil { + pdir.Close() + return err + } + return pdir.Close() +} diff --git a/record.go b/record.go new file mode 100644 index 0000000000..c8cc7a5043 --- /dev/null +++ b/record.go @@ -0,0 +1,213 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "math" + "sort" + + "github.com/pkg/errors" + "github.com/prometheus/tsdb/labels" +) + +// RecordType represents the data type of a record. +type RecordType uint8 + +const ( + RecordInvalid RecordType = 255 + RecordSeries RecordType = 1 + RecordSamples RecordType = 2 + RecordTombstones RecordType = 3 +) + +type RecordLogger interface { + Log(recs ...[]byte) error +} + +type RecordReader interface { + Next() bool + Err() error + Record() []byte +} + +// RecordDecoder decodes series, sample, and tombstone records. +// The zero value is ready to use. +type RecordDecoder struct { +} + +// Type returns the type of the record. +// Return RecordInvalid if no valid record type is found. +func (d *RecordDecoder) Type(rec []byte) RecordType { + if len(rec) < 1 { + return RecordInvalid + } + switch t := RecordType(rec[0]); t { + case RecordSeries, RecordSamples, RecordTombstones: + return t + } + return RecordInvalid +} + +// Series appends series in rec to the given slice. +func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordSeries { + return nil, errors.New("invalid record type") + } + for len(dec.b) > 0 && dec.err() == nil { + ref := dec.be64() + + lset := make(labels.Labels, dec.uvarint()) + + for i := range lset { + lset[i].Name = dec.uvarintStr() + lset[i].Value = dec.uvarintStr() + } + sort.Sort(lset) + + series = append(series, RefSeries{ + Ref: ref, + Labels: lset, + }) + } + if dec.err() != nil { + return nil, dec.err() + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return series, nil +} + +// Samples appends samples in rec to the given slice. +func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordSamples { + return nil, errors.New("invalid record type") + } + if dec.len() == 0 { + return samples, nil + } + var ( + baseRef = dec.be64() + baseTime = dec.be64int64() + ) + for len(dec.b) > 0 && dec.err() == nil { + dref := dec.varint64() + dtime := dec.varint64() + val := dec.be64() + + samples = append(samples, RefSample{ + Ref: uint64(int64(baseRef) + dref), + T: baseTime + dtime, + V: math.Float64frombits(val), + }) + } + + if dec.err() != nil { + return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return samples, nil +} + +// Tombstones appends tombstones in rec to the given slice. +func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordTombstones { + return nil, errors.New("invalid record type") + } + for dec.len() > 0 && dec.err() == nil { + tstones = append(tstones, Stone{ + ref: dec.be64(), + intervals: Intervals{ + {Mint: dec.varint64(), Maxt: dec.varint64()}, + }, + }) + } + if dec.err() != nil { + return nil, dec.err() + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return tstones, nil +} + +// RecordEncoder encodes series, sample, and tombstones records. +// The zero value is ready to use. +type RecordEncoder struct { +} + +// Series appends the encoded series to b and returns the resulting slice. +func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordSeries)) + + for _, s := range series { + buf.putBE64(s.Ref) + buf.putUvarint(len(s.Labels)) + + for _, l := range s.Labels { + buf.putUvarintStr(l.Name) + buf.putUvarintStr(l.Value) + } + } + return buf.get() +} + +// Samples appends the encoded samples to b and returns the resulting slice. +func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordSamples)) + + if len(samples) == 0 { + return buf.get() + } + + // Store base timestamp and base reference number of first sample. + // All samples encode their timestamp and ref as delta to those. + first := samples[0] + + buf.putBE64(first.Ref) + buf.putBE64int64(first.T) + + for _, s := range samples { + buf.putVarint64(int64(s.Ref) - int64(first.Ref)) + buf.putVarint64(s.T - first.T) + buf.putBE64(math.Float64bits(s.V)) + } + return buf.get() +} + +// Tombstones appends the encoded tombstones to b and returns the resulting slice. +func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordTombstones)) + + for _, s := range tstones { + for _, iv := range s.intervals { + buf.putBE64(s.ref) + buf.putVarint64(iv.Mint) + buf.putVarint64(iv.Maxt) + } + } + return buf.get() +} diff --git a/record_test.go b/record_test.go new file mode 100644 index 0000000000..4257fc0c52 --- /dev/null +++ b/record_test.go @@ -0,0 +1,73 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "testing" + + "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/testutil" +) + +func TestRecord_EncodeDecode(t *testing.T) { + var enc RecordEncoder + var dec RecordDecoder + + series := []RefSeries{ + { + Ref: 100, + Labels: labels.FromStrings("abc", "def", "123", "456"), + }, { + Ref: 1, + Labels: labels.FromStrings("abc", "def2", "1234", "4567"), + }, { + Ref: 435245, + Labels: labels.FromStrings("xyz", "def", "foo", "bar"), + }, + } + decSeries, err := dec.Series(enc.Series(series, nil), nil) + testutil.Ok(t, err) + testutil.Equals(t, series, decSeries) + + samples := []RefSample{ + {Ref: 0, T: 12423423, V: 1.2345}, + {Ref: 123, T: -1231, V: -123}, + {Ref: 2, T: 0, V: 99999}, + } + decSamples, err := dec.Samples(enc.Samples(samples, nil), nil) + testutil.Ok(t, err) + testutil.Equals(t, samples, decSamples) + + // Intervals get split up into single entries. So we don't get back exactly + // what we put in. + tstones := []Stone{ + {ref: 123, intervals: Intervals{ + {Mint: -1000, Maxt: 1231231}, + {Mint: 5000, Maxt: 0}, + }}, + {ref: 13, intervals: Intervals{ + {Mint: -1000, Maxt: -11}, + {Mint: 5000, Maxt: 1000}, + }}, + } + decTstones, err := dec.Tombstones(enc.Tombstones(tstones, nil), nil) + testutil.Ok(t, err) + testutil.Equals(t, []Stone{ + {ref: 123, intervals: Intervals{{Mint: -1000, Maxt: 1231231}}}, + {ref: 123, intervals: Intervals{{Mint: 5000, Maxt: 0}}}, + {ref: 13, intervals: Intervals{{Mint: -1000, Maxt: -11}}}, + {ref: 13, intervals: Intervals{{Mint: 5000, Maxt: 1000}}}, + }, decTstones) +} From def912ce0ef48b13a4bf0647644eb2d1e21a72c8 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 17 May 2018 09:04:32 -0400 Subject: [PATCH 04/17] Integrate new WAL and checkpoints Remove the old WAL and drop in the new one Signed-off-by: Fabian Reinartz --- checkpoint_test.go | 6 +- db.go | 7 +- head.go | 270 ++++++++++++++++++++++++++++++++++----------- head_test.go | 116 +++++++++++-------- 4 files changed, 285 insertions(+), 114 deletions(-) diff --git a/checkpoint_test.go b/checkpoint_test.go index 074bb46e8c..daa54df194 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -15,7 +15,6 @@ package tsdb import ( - "fmt" "io/ioutil" "os" "path/filepath" @@ -82,7 +81,7 @@ func TestDeleteCheckpoints(t *testing.T) { func TestCheckpoint(t *testing.T) { dir, err := ioutil.TempDir("", "test_checkpoint") testutil.Ok(t, err) - fmt.Println(dir) + defer os.RemoveAll(dir) var enc RecordEncoder // Create a dummy segment to bump the initial number. @@ -138,11 +137,10 @@ func TestCheckpoint(t *testing.T) { } testutil.Ok(t, w.Close()) - stats, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { + _, err = Checkpoint(nil, w, 100, 106, func(x uint64) bool { return x%2 == 0 }, last/2) testutil.Ok(t, err) - testutil.Equals(t, 106, stats.HighSegment) // Only the new checkpoint should be left. files, err := fileutil.ReadDir(dir) diff --git a/db.go b/db.go index fcfbeeeb29..df17991ebd 100644 --- a/db.go +++ b/db.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" "golang.org/x/sync/errgroup" ) @@ -221,18 +222,18 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, errors.Wrap(err, "create leveled compactor") } - wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r) + wlog, err := wal.New(l, r, filepath.Join(dir, "wal")) if err != nil { return nil, err } - db.head, err = NewHead(r, l, wal, opts.BlockRanges[0]) + db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0]) if err != nil { return nil, err } if err := db.reload(); err != nil { return nil, err } - if err := db.head.ReadWAL(); err != nil { + if err := db.head.Init(); err != nil { return nil, errors.Wrap(err, "read WAL") } diff --git a/head.go b/head.go index 372842865c..4e12369dad 100644 --- a/head.go +++ b/head.go @@ -15,6 +15,7 @@ package tsdb import ( "math" + "path/filepath" "runtime" "sort" "strings" @@ -30,6 +31,7 @@ import ( "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" ) var ( @@ -53,9 +55,10 @@ var ( type Head struct { chunkRange int64 metrics *headMetrics - wal WAL + wal *wal.WAL logger log.Logger appendPool sync.Pool + bytesPool sync.Pool minTime, maxTime int64 lastSeriesID uint64 @@ -169,13 +172,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { } // NewHead opens the head block in dir. -func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) { if l == nil { l = log.NewNopLogger() } - if wal == nil { - wal = NopWAL() - } if chunkRange < 1 { return nil, errors.Errorf("invalid chunk range %d", chunkRange) } @@ -206,6 +206,8 @@ func (h *Head) processWALSamples( ) (unknownRefs uint64) { defer close(output) + maxt := h.MaxTime() + for samples := range input { for _, s := range samples { if s.T < mint || s.Ref%total != partition { @@ -221,17 +223,27 @@ func (h *Head) processWALSamples( h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } + if s.T > maxt { + maxt = s.T + } } output <- samples } + + for { + ht := h.MaxTime() + if maxt <= ht { + break + } + if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) { + break + } + } + return unknownRefs } -// ReadWAL initializes the head by consuming the write ahead log. -func (h *Head) ReadWAL() error { - defer h.postings.EnsureOrder() - - r := h.wal.Reader() +func (h *Head) loadWAL(r *wal.Reader) error { mint := h.MinTime() // Track number of samples that referenced a series we don't know about @@ -263,49 +275,71 @@ func (h *Head) ReadWAL() error { input = output } - // TODO(fabxc): series entries spread between samples can starve the sample workers. - // Even with bufferd channels, this can impact startup time with lots of series churn. - // We must not paralellize series creation itself but could make the indexing asynchronous. - seriesFunc := func(series []RefSeries) { - for _, s := range series { - h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + var ( + dec RecordDecoder + series []RefSeries + samples []RefSample + tstones []Stone + ) + for r.Next() { + series, samples, tstones = series[:0], samples[:0], tstones[:0] + rec := r.Record() - if h.lastSeriesID < s.Ref { - h.lastSeriesID = s.Ref + switch dec.Type(rec) { + case RecordSeries: + series, err := dec.Series(rec, series) + if err != nil { + return errors.Wrap(err, "decode series") } - } - } - samplesFunc := func(samples []RefSample) { - // We split up the samples into chunks of 5000 samples or less. - // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise - // cause thousands of very large in flight buffers occupying large amounts - // of unused memory. - for len(samples) > 0 { - n := 5000 - if len(samples) < n { - n = len(samples) - } - var buf []RefSample - select { - case buf = <-input: - default: - } - firstInput <- append(buf[:0], samples[:n]...) - samples = samples[n:] - } - } - deletesFunc := func(stones []Stone) { - for _, s := range stones { - for _, itv := range s.intervals { - if itv.Maxt < mint { - continue + for _, s := range series { + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref } - h.tombstones.addInterval(s.ref, itv) } + case RecordSamples: + samples, err := dec.Samples(rec, samples) + if err != nil { + return errors.Wrap(err, "decode samples") + } + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + n := 5000 + if len(samples) < n { + n = len(samples) + } + var buf []RefSample + select { + case buf = <-input: + default: + } + firstInput <- append(buf[:0], samples[:n]...) + samples = samples[n:] + } + case RecordTombstones: + tstones, err := dec.Tombstones(rec, tstones) + if err != nil { + return errors.Wrap(err, "decode tombstones") + } + for _, s := range tstones { + for _, itv := range s.intervals { + if itv.Maxt < mint { + continue + } + h.tombstones.addInterval(s.ref, itv) + } + } + default: + return errors.Errorf("invalid record type %v", dec.Type(rec)) } } - - err := r.Read(seriesFunc, samplesFunc, deletesFunc) + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } // Signal termination to first worker and wait for last one to close its output channel. close(firstInput) @@ -313,16 +347,58 @@ func (h *Head) ReadWAL() error { } wg.Wait() - if err != nil { - return errors.Wrap(err, "consume WAL") - } if unknownRefs > 0 { - level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs) + level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) } return nil } -// Truncate removes all data before mint from the head block and truncates its WAL. +// Init loads data from the write ahead log and prepares the head for writes. +func (h *Head) Init() error { + defer h.postings.EnsureOrder() + + if h.wal == nil { + return nil + } + + // Backfill the checkpoint first if it exists. + cp, n, err := LastCheckpoint(h.wal.Dir()) + if err != nil && err != ErrNotFound { + return errors.Wrap(err, "find last checkpoint") + } + if err == nil { + sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp)) + if err != nil { + return errors.Wrap(err, "open checkpoint") + } + defer sr.Close() + + // A corrupted checkpoint is a hard error for now and requires user + // intervention. There's likely little data that can be recovered anyway. + if err := h.loadWAL(wal.NewReader(sr)); err != nil { + return errors.Wrap(err, "backfill checkpoint") + } + n++ + } + + // Backfill segments from the last checkpoint onwards + sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1) + if err != nil { + return errors.Wrap(err, "open WAL segments") + } + defer sr.Close() + + err = h.loadWAL(wal.NewReader(sr)) + if err == nil { + return nil + } + if err := h.wal.Repair(err); err != nil { + return errors.Wrap(err, "repair corrupted WAL") + } + return nil +} + +// Truncate removes old data before mint from the head. func (h *Head) Truncate(mint int64) error { initialize := h.MinTime() == math.MinInt64 @@ -348,18 +424,37 @@ func (h *Head) Truncate(mint int64) error { level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start)) h.metrics.gcDuration.Observe(time.Since(start).Seconds()) + if h.wal == nil { + return nil + } start = time.Now() + m, n, err := h.wal.Segments() + if err != nil { + return errors.Wrap(err, "get segment range") + } + n-- // Never consider last segment for checkpoint. + if n < 0 { + return nil // no segments yet. + } + // The lower third of segments should contain mostly obsolete samples. + // If we have too few segments, it's not worth checkpointing yet. + n = m + (n-m)/3 + if n <= m { + return nil + } + keep := func(id uint64) bool { return h.series.getByID(id) != nil } - if err := h.wal.Truncate(mint, keep); err == nil { - level.Info(h.logger).Log("msg", "WAL truncation completed", "duration", time.Since(start)) - } else { - level.Error(h.logger).Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start)) + if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil { + return errors.Wrap(err, "create checkpoint") } h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) + level.Info(h.logger).Log("msg", "WAL checkpoint complete", + "low", m, "high", n, "duration", time.Since(start)) + return nil } @@ -468,6 +563,18 @@ func (h *Head) putAppendBuffer(b []RefSample) { h.appendPool.Put(b[:0]) } +func (h *Head) getBytesBuffer() []byte { + b := h.bytesPool.Get() + if b == nil { + return make([]byte, 0, 1024) + } + return b.([]byte) +} + +func (h *Head) putBytesBuffer(b []byte) { + h.bytesPool.Put(b[:0]) +} + type headAppender struct { head *Head mint, maxt int64 @@ -520,15 +627,42 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { return nil } +func (a *headAppender) log() error { + if a.head.wal == nil { + return nil + } + + buf := a.head.getBytesBuffer() + defer func() { a.head.putBytesBuffer(buf) }() + + var rec []byte + var enc RecordEncoder + + if len(a.series) > 0 { + rec = enc.Series(a.series, buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log series") + } + } + if len(a.samples) > 0 { + rec = enc.Samples(a.samples, buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log samples") + } + } + return nil +} + func (a *headAppender) Commit() error { defer a.head.metrics.activeAppenders.Dec() defer a.head.putAppendBuffer(a.samples) - if err := a.head.wal.LogSeries(a.series); err != nil { - return err - } - if err := a.head.wal.LogSamples(a.samples); err != nil { - return errors.Wrap(err, "WAL log samples") + if err := a.log(); err != nil { + return errors.Wrap(err, "write to WAL") } total := len(a.samples) @@ -568,7 +702,8 @@ func (a *headAppender) Rollback() error { // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. - return a.head.wal.LogSeries(a.series) + a.samples = nil + return a.log() } // Delete all samples in the range of [mint, maxt] for series that satisfy the given @@ -601,8 +736,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { if p.Err() != nil { return p.Err() } - if err := h.wal.LogDeletes(stones); err != nil { - return err + var enc RecordEncoder + + if h.wal != nil { + if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { + return err + } } for _, s := range stones { h.tombstones.addInterval(s.ref, s.intervals[0]) @@ -694,6 +833,9 @@ func (h *Head) MaxTime() int64 { // Close flushes the WAL and closes the head. func (h *Head) Close() error { + if h.wal == nil { + return nil + } return h.wal.Close() } diff --git a/head_test.go b/head_test.go index 9a8c893644..b06a66c26c 100644 --- a/head_test.go +++ b/head_test.go @@ -14,7 +14,9 @@ package tsdb import ( + "io/ioutil" "math/rand" + "os" "testing" "github.com/prometheus/tsdb/chunkenc" @@ -22,6 +24,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func BenchmarkCreateSeries(b *testing.B) { @@ -42,42 +45,50 @@ func BenchmarkCreateSeries(b *testing.B) { } } -type memoryWAL struct { - nopWAL - entries []interface{} -} - -func (w *memoryWAL) LogSeries(s []RefSeries) error { - w.entries = append(w.entries, s) - return nil -} - -func (w *memoryWAL) LogSamples(s []RefSample) error { - w.entries = append(w.entries, s) - return nil -} - -func (w *memoryWAL) LogDeletes(s []Stone) error { - w.entries = append(w.entries, s) - return nil -} - -func (w *memoryWAL) Reader() WALReader { - return w -} - -func (w *memoryWAL) Read(series func([]RefSeries), samples func([]RefSample), deletes func([]Stone)) error { - for _, e := range w.entries { - switch v := e.(type) { +func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { + var enc RecordEncoder + for _, r := range recs { + switch v := r.(type) { case []RefSeries: - series(v) + testutil.Ok(t, w.Log(enc.Series(v, nil))) case []RefSample: - samples(v) + testutil.Ok(t, w.Log(enc.Samples(v, nil))) case []Stone: - deletes(v) + testutil.Ok(t, w.Log(enc.Tombstones(v, nil))) } } - return nil +} + +func readTestWAL(t testing.TB, dir string) (recs []interface{}) { + sr, err := wal.NewSegmentsReader(dir) + testutil.Ok(t, err) + defer sr.Close() + + var dec RecordDecoder + r := wal.NewReader(sr) + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err := dec.Series(rec, nil) + testutil.Ok(t, err) + recs = append(recs, series) + case RecordSamples: + samples, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + recs = append(recs, samples) + case RecordTombstones: + tstones, err := dec.Tombstones(rec, nil) + testutil.Ok(t, err) + recs = append(recs, tstones) + default: + t.Fatalf("unknown record type") + } + } + testutil.Ok(t, r.Err()) + return recs } func TestHead_ReadWAL(t *testing.T) { @@ -100,13 +111,19 @@ func TestHead_ReadWAL(t *testing.T) { {Ref: 50, T: 101, V: 6}, }, } - wal := &memoryWAL{entries: entries} + dir, err := ioutil.TempDir("", "test_read_wal") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - head, err := NewHead(nil, nil, wal, 1000) + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) + populateTestWAL(t, w, entries) + + head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) defer head.Close() - testutil.Ok(t, head.ReadWAL()) + testutil.Ok(t, head.Init()) testutil.Equals(t, uint64(100), head.lastSeriesID) s10 := head.series.getByID(10) @@ -259,13 +276,19 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { {Ref: 50, T: 90, V: 1}, }, } - wal := &memoryWAL{entries: entries} + dir, err := ioutil.TempDir("", "test_delete_series") + testutil.Ok(t, err) + defer os.RemoveAll(dir) - head, err := NewHead(nil, nil, wal, 1000) + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) + populateTestWAL(t, w, entries) + + head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) defer head.Close() - testutil.Ok(t, head.ReadWAL()) + testutil.Ok(t, head.Init()) testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) } @@ -705,7 +728,7 @@ func TestMemSeries_append(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. - h, err := NewHead(nil, nil, NopWAL(), 1000) + h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() @@ -745,7 +768,7 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. - h, err := NewHead(nil, nil, NopWAL(), 1000) + h, err := NewHead(nil, nil, nil, 1000) testutil.Ok(t, err) defer h.Close() @@ -786,7 +809,12 @@ func TestGCSeriesAccess(t *testing.T) { } func TestHead_LogRollback(t *testing.T) { - w := &memoryWAL{} + dir, err := ioutil.TempDir("", "wal_rollback") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := wal.New(nil, nil, dir) + testutil.Ok(t, err) h, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) @@ -795,9 +823,11 @@ func TestHead_LogRollback(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Rollback()) - testutil.Equals(t, 1, len(w.entries)) + recs := readTestWAL(t, w.Dir()) - series, ok := w.entries[0].([]RefSeries) - testutil.Assert(t, ok, "expected series record but got %+v", w.entries[0]) + testutil.Equals(t, 1, len(recs)) + + series, ok := recs[0].([]RefSeries) + testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) testutil.Equals(t, series, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}) } From 7841d417b36464a3fccc57164b7637d7465d989c Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 22 May 2018 08:51:20 -0400 Subject: [PATCH 05/17] Ensure blocks are time-ordered in memory We assume in multiple places that the block list held by DB has blocks sequential by time. A regression caused us to hold them ordered by ULID, i.e. by creation time instead. Signed-off-by: Fabian Reinartz --- db.go | 5 ----- db_test.go | 24 ++++++++++++++++-------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/db.go b/db.go index df17991ebd..1b508e0666 100644 --- a/db.go +++ b/db.go @@ -502,7 +502,6 @@ func (db *DB) reload() (err error) { sort.Slice(blocks, func(i, j int) bool { return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime }) - if err := validateBlockSequence(blocks); err != nil { return errors.Wrap(err, "invalid block sequence") } @@ -597,10 +596,6 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps { if len(bm) <= 1 { return nil } - sort.Slice(bm, func(i, j int) bool { - return bm[i].MinTime < bm[j].MinTime - }) - var ( overlaps [][]BlockMeta diff --git a/db_test.go b/db_test.go index 205b0f8440..698bf78b00 100644 --- a/db_test.go +++ b/db_test.go @@ -1025,33 +1025,41 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { testutil.Assert(t, len(OverlappingBlocks(metas)) == 0, "we found unexpected overlaps") - // Add overlapping blocks. + // Add overlapping blocks. We've to establish order again since we aren't interested + // in trivial overlaps caused by unorderedness. + add := func(ms ...BlockMeta) []BlockMeta { + repl := append(append([]BlockMeta{}, metas...), ms...) + sort.Slice(repl, func(i, j int) bool { + return repl[i].MinTime < repl[j].MinTime + }) + return repl + } // o1 overlaps with 10-20. o1 := BlockMeta{MinTime: 15, MaxTime: 17} testutil.Equals(t, Overlaps{ {Min: 15, Max: 17}: {metas[1], o1}, - }, OverlappingBlocks(append(metas, o1))) + }, OverlappingBlocks(add(o1))) // o2 overlaps with 20-30 and 30-40. o2 := BlockMeta{MinTime: 21, MaxTime: 31} testutil.Equals(t, Overlaps{ {Min: 21, Max: 30}: {metas[2], o2}, {Min: 30, Max: 31}: {o2, metas[3]}, - }, OverlappingBlocks(append(metas, o2))) + }, OverlappingBlocks(add(o2))) // o3a and o3b overlaps with 30-40 and each other. o3a := BlockMeta{MinTime: 33, MaxTime: 39} o3b := BlockMeta{MinTime: 34, MaxTime: 36} testutil.Equals(t, Overlaps{ {Min: 34, Max: 36}: {metas[3], o3a, o3b}, - }, OverlappingBlocks(append(metas, o3a, o3b))) + }, OverlappingBlocks(add(o3a, o3b))) // o4 is 1:1 overlap with 50-60. o4 := BlockMeta{MinTime: 50, MaxTime: 60} testutil.Equals(t, Overlaps{ {Min: 50, Max: 60}: {metas[5], o4}, - }, OverlappingBlocks(append(metas, o4))) + }, OverlappingBlocks(add(o4))) // o5 overlaps with 60-70, 70-80 and 80-90. o5 := BlockMeta{MinTime: 61, MaxTime: 85} @@ -1059,7 +1067,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { {Min: 61, Max: 70}: {metas[6], o5}, {Min: 70, Max: 80}: {o5, metas[7]}, {Min: 80, Max: 85}: {o5, metas[8]}, - }, OverlappingBlocks(append(metas, o5))) + }, OverlappingBlocks(add(o5))) // o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a. o6a := BlockMeta{MinTime: 92, MaxTime: 105} @@ -1067,7 +1075,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { testutil.Equals(t, Overlaps{ {Min: 94, Max: 99}: {metas[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, metas[10]}, - }, OverlappingBlocks(append(metas, o6a, o6b))) + }, OverlappingBlocks(add(o6a, o6b))) // All together. testutil.Equals(t, Overlaps{ @@ -1077,7 +1085,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { {Min: 50, Max: 60}: {metas[5], o4}, {Min: 61, Max: 70}: {metas[6], o5}, {Min: 70, Max: 80}: {o5, metas[7]}, {Min: 80, Max: 85}: {o5, metas[8]}, {Min: 94, Max: 99}: {metas[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, metas[10]}, - }, OverlappingBlocks(append(metas, o1, o2, o3a, o3b, o4, o5, o6a, o6b))) + }, OverlappingBlocks(add(o1, o2, o3a, o3b, o4, o5, o6a, o6b))) // Additional case. var nc1 []BlockMeta From d951140ab855b08f3d0bca4992ed9a46baf33f09 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 24 May 2018 15:51:47 -0400 Subject: [PATCH 06/17] wal: avoid heap allocation in WAL reader The buffers we allocated were escaping to the heap, resulting in large memory usage spikes during startup and checkpointing in Prometheus. This attaches the buffer to the reader object to prevent this. Signed-off-by: Fabian Reinartz --- wal/wal.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index ed979998c5..4ba3aec4d6 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -621,14 +621,6 @@ func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { return newSegmentBufReader(segs...), nil } -// Reader reads WAL records from an io.Reader. -type Reader struct { - rdr io.Reader - err error - rec []byte - total int // total bytes processed. -} - // segmentBufReader is a buffered reader that reads in multiples of pages. // The main purpose is that we are able to track segment and offset for // corruption reporting. @@ -683,6 +675,15 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) { return n, nil } +// Reader reads WAL records from an io.Reader. +type Reader struct { + rdr io.Reader + err error + rec []byte + buf [pageSize]byte + total int // total bytes processed. +} + // NewReader returns a new reader. func NewReader(r io.Reader) *Reader { return &Reader{rdr: r} @@ -700,8 +701,11 @@ func (r *Reader) Next() bool { } func (r *Reader) next() (err error) { - var hdr [recordHeaderSize]byte - var buf [pageSize]byte + // We have to use r.buf since allocating byte arrays here fails escape + // analysis and ends up on the heap, even though it seemingly should not. + hdr := r.buf[:7] + buf := r.buf[7:] + r.rec = r.rec[:0] i := 0 @@ -745,7 +749,7 @@ func (r *Reader) next() (err error) { crc = binary.BigEndian.Uint32(hdr[3:]) ) - if length > pageSize { + if length > pageSize-recordHeaderSize { return errors.Errorf("invalid record size %d", length) } n, err = io.ReadFull(r.rdr, buf[:length]) From 3f538817f80230a4dbeeba2b2170125e1cd8899d Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 29 May 2018 11:46:06 -0400 Subject: [PATCH 07/17] move WAL lock Signed-off-by: Fabian Reinartz --- wal/wal.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index 4ba3aec4d6..89369195b5 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -429,6 +429,8 @@ func (w *WAL) pagesPerSegment() int { // Log writes the records into the log. // Multiple records can be passed at once to reduce writes and increase throughput. func (w *WAL) Log(recs ...[]byte) error { + w.mtx.Lock() + defer w.mtx.Unlock() // Callers could just implement their own list record format but adding // a bit of extra logic here frees them from that overhead. for i, r := range recs { @@ -442,9 +444,6 @@ func (w *WAL) Log(recs ...[]byte) error { // log writes rec to the log and forces a flush of the current page if its // the final record of a batch. func (w *WAL) log(rec []byte, final bool) error { - w.mtx.Lock() - defer w.mtx.Unlock() - // If the record is too big to fit within pages in the current // segment, terminate the active segment and advance to the next one. // This ensures that records do not cross segment boundaries. From 0ad2b8a349b4099efff0098585234010e258dca8 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 30 May 2018 11:05:30 -0400 Subject: [PATCH 08/17] docs: add new WAL format Signed-off-by: Fabian Reinartz --- docs/format/wal.md | 72 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 docs/format/wal.md diff --git a/docs/format/wal.md b/docs/format/wal.md new file mode 100644 index 0000000000..f0daba24de --- /dev/null +++ b/docs/format/wal.md @@ -0,0 +1,72 @@ +# WAL Disk Format + +The write ahead log operates in segments that that are numbered and sequential, +e.g. `000000`, `000001`, `000002`, etc., and are limited to 128MB by default. +A segment is written to in pages of 32KB. Only the last page of the most recent segment +may be partial. A WAL record is an opaque byte slice that gets split up into sub-records +should it exceed the remaining space of the current page. Records are never split across +segment boundaries. +The encoding of pages is largely borrowed from [LevelDB's/RocksDB's wirte ahead log.][1] + +Notable deviations are that the record fragment is encoded as: + +┌───────────┬──────────┬────────────┬──────────────┐ +│ type <1b> │ len <2b> │ CRC32 <4b> │ data │ +└───────────┴──────────┴────────────┴──────────────┘ + +## Record encoding + +The records written to the write ahead log are encoded as follows: + +### Series records + +Series records encode the labels that identifier a series and its unique ID. + +┌────────────────────────────────────────────┐ +│ type = 1 <1b> │ +├────────────────────────────────────────────┤ +│ ┌─────────┬──────────────────────────────┐ │ +│ │ id <8b> │ n = len(labels) │ │ +│ ├─────────┴────────────┬─────────────────┤ │ +│ │ len(str_1) │ str_1 │ │ +│ ├──────────────────────┴─────────────────┤ │ +│ │ ... │ │ +│ ├───────────────────────┬────────────────┤ │ +│ │ len(str_2n) │ str_2n │ │ +│ └───────────────────────┴────────────────┘ │ +│ . . . │ +└────────────────────────────────────────────┘ + +### Sample records + +Sample records encode samples as a list of triples `(series_id, timestamp, value)`. +Series reference and timestamp are encoded as deltas w.r.t the first sample. + +┌──────────────────────────────────────────────────────────────────┐ +│ type = 2 <1b> │ +├──────────────────────────────────────────────────────────────────┤ +│ ┌────────────────────┬───────────────────────────┬─────────────┐ │ +│ │ id <8b> │ timestamp <8b> │ value <8b> │ │ +│ └────────────────────┴───────────────────────────┴─────────────┘ │ +│ ┌────────────────────┬───────────────────────────┬─────────────┐ │ +│ │ id_delta │ timestamp_delta │ value <8b> │ │ +│ └────────────────────┴───────────────────────────┴─────────────┘ │ +│ . . . │ +└──────────────────────────────────────────────────────────────────┘ + +### Tombstone records + +Tombstone records encode tombstones as a list of triples `(series_id, min_time, max_time)` +and specify an interval for which samples of a series got deleted. + + +┌─────────────────────────────────────────────────────┐ +│ type = 3 <1b> │ +├─────────────────────────────────────────────────────┤ +│ ┌─────────┬───────────────────┬───────────────────┐ │ +│ │ id <8b> │ min_time │ max_time │ │ +│ └─────────┴───────────────────┴───────────────────┘ │ +│ . . . │ +└─────────────────────────────────────────────────────┘ + +[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format] \ No newline at end of file From 3e76f0163e4129175d6b8e2b4a2a4a7fd23b326d Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 18 Jun 2018 07:52:57 -0400 Subject: [PATCH 09/17] Address comments Signed-off-by: Fabian Reinartz --- checkpoint.go | 16 ++++++++-------- docs/format/wal.md | 16 ++++++++++++---- head.go | 2 +- wal/wal.go | 38 ++++++++++++++++---------------------- 4 files changed, 37 insertions(+), 35 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index 2ab5f8d95c..87ff5597e3 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -35,9 +35,9 @@ type CheckpointStats struct { DroppedSeries int DroppedSamples int DroppedTombstones int - TotalSeries int - TotalSamples int - TotalTombstones int + TotalSeries int // Processed series including dropped ones. + TotalSamples int // Processed samples inlcuding dropped ones. + TotalTombstones int // Processed tombstones including droppes ones. } // LastCheckpoint returns the directory name of the most recent checkpoint. @@ -129,16 +129,16 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo sr = last } - segs, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) + segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) if err != nil { return nil, errors.Wrap(err, "create segment reader") } - defer segs.Close() + defer segsr.Close() if sr != nil { - sr = io.MultiReader(sr, segs) + sr = io.MultiReader(sr, segsr) } else { - sr = segs + sr = segsr } } @@ -169,7 +169,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo // We don't reset the buffer since we batch up multiple records // before writing them to the checkpoint. - // Remember where the record for this iteration starts. + // Remember where the record for this iteration starts. start := len(buf) rec := r.Record() diff --git a/docs/format/wal.md b/docs/format/wal.md index f0daba24de..ca3fae39de 100644 --- a/docs/format/wal.md +++ b/docs/format/wal.md @@ -5,14 +5,17 @@ e.g. `000000`, `000001`, `000002`, etc., and are limited to 128MB by default. A segment is written to in pages of 32KB. Only the last page of the most recent segment may be partial. A WAL record is an opaque byte slice that gets split up into sub-records should it exceed the remaining space of the current page. Records are never split across -segment boundaries. -The encoding of pages is largely borrowed from [LevelDB's/RocksDB's wirte ahead log.][1] +segment boundaries. If a single record exceeds the default segment size, a segment with +a larger size will be created. +The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.][1] Notable deviations are that the record fragment is encoded as: +``` ┌───────────┬──────────┬────────────┬──────────────┐ │ type <1b> │ len <2b> │ CRC32 <4b> │ data │ └───────────┴──────────┴────────────┴──────────────┘ +``` ## Record encoding @@ -22,6 +25,7 @@ The records written to the write ahead log are encoded as follows: Series records encode the labels that identifier a series and its unique ID. +``` ┌────────────────────────────────────────────┐ │ type = 1 <1b> │ ├────────────────────────────────────────────┤ @@ -36,12 +40,14 @@ Series records encode the labels that identifier a series and its unique ID. │ └───────────────────────┴────────────────┘ │ │ . . . │ └────────────────────────────────────────────┘ +``` ### Sample records Sample records encode samples as a list of triples `(series_id, timestamp, value)`. Series reference and timestamp are encoded as deltas w.r.t the first sample. +``` ┌──────────────────────────────────────────────────────────────────┐ │ type = 2 <1b> │ ├──────────────────────────────────────────────────────────────────┤ @@ -53,13 +59,14 @@ Series reference and timestamp are encoded as deltas w.r.t the first sample. │ └────────────────────┴───────────────────────────┴─────────────┘ │ │ . . . │ └──────────────────────────────────────────────────────────────────┘ +``` ### Tombstone records Tombstone records encode tombstones as a list of triples `(series_id, min_time, max_time)` and specify an interval for which samples of a series got deleted. - +``` ┌─────────────────────────────────────────────────────┐ │ type = 3 <1b> │ ├─────────────────────────────────────────────────────┤ @@ -68,5 +75,6 @@ and specify an interval for which samples of a series got deleted. │ └─────────┴───────────────────┴───────────────────┘ │ │ . . . │ └─────────────────────────────────────────────────────┘ +``` -[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format] \ No newline at end of file +[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format] diff --git a/head.go b/head.go index 4e12369dad..61457911f1 100644 --- a/head.go +++ b/head.go @@ -438,7 +438,7 @@ func (h *Head) Truncate(mint int64) error { return nil // no segments yet. } // The lower third of segments should contain mostly obsolete samples. - // If we have too few segments, it's not worth checkpointing yet. + // If we have less than three segments, it's not worth checkpointing yet. n = m + (n-m)/3 if n <= m { return nil diff --git a/wal/wal.go b/wal/wal.go index 89369195b5..90228184ab 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -23,6 +23,7 @@ import ( "math" "os" "path/filepath" + "sort" "strconv" "sync" "time" @@ -35,9 +36,7 @@ import ( ) const ( - version = 1 defaultSegmentSize = 128 * 1024 * 1024 // 128 MB - maxRecordSize = 1 * 1024 * 1024 // 1MB pageSize = 32 * 1024 // 32KB recordHeaderSize = 7 ) @@ -94,7 +93,6 @@ func (e *CorruptionErr) Error() string { // OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. func OpenWriteSegment(dir string, k int) (*Segment, error) { - // Only .active segments are allowed to be opened for write. f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666) if err != nil { return nil, err @@ -127,7 +125,7 @@ func CreateSegment(dir string, k int) (*Segment, error) { return &Segment{File: f, i: k, dir: dir}, nil } -// OpenReadSegment opens the segment k in dir for reading. +// OpenReadSegment opens the segment with the given filename. func OpenReadSegment(fn string) (*Segment, error) { k, err := strconv.Atoi(filepath.Base(fn)) if err != nil { @@ -142,7 +140,7 @@ func OpenReadSegment(fn string) (*Segment, error) { // WAL is a write ahead log that stores records in segment files. // It must be read from start to end once before logging new data. -// If an errore occurs during read, the repair procedure must be called +// If an erroe occurs during read, the repair procedure must be called // before it's safe to do further writes. // // Segments are written to in pages of 32KB, with records possibly split @@ -244,23 +242,19 @@ Loop: case f := <-w.actorc: f() case donec := <-w.stopc: + close(w.actorc) defer close(donec) break Loop } } // Drain and process any remaining functions. - for { - select { - case f := <-w.actorc: - f() - default: - return - } + for f := range w.actorc { + f() } } // Repair attempts to repair the WAL based on the error. -// It discards all data behind the corruption +// It discards all data after the corruption. func (w *WAL) Repair(err error) error { // We could probably have a mode that only discards torn records right around // the corruption to preserve as data much as possible. @@ -333,7 +327,7 @@ func (w *WAL) Repair(err error) error { // SegmentName builds a segment name for the directory. func SegmentName(dir string, i int) string { - return filepath.Join(dir, fmt.Sprintf("%06d", i)) + return filepath.Join(dir, fmt.Sprintf("%08d", i)) } // nextSegment creates the next segment and closes the previous one. @@ -384,6 +378,7 @@ func (w *WAL) flushPage(clear bool) error { } p.flushed += n + // We flushed an entire page, prepare a new one. if clear { for i := range p.buf { p.buf[i] = 0 @@ -485,7 +480,7 @@ func (w *WAL) log(rec []byte, final bool) error { binary.BigEndian.PutUint16(buf[1:], uint16(len(part))) binary.BigEndian.PutUint32(buf[3:], crc) - copy(buf[7:], part) + copy(buf[recordHeaderSize:], part) p.alloc += len(part) + recordHeaderSize // If we wrote a full record, we can fit more records of the batch @@ -587,6 +582,9 @@ func listSegments(dir string) (refs []segmentRef, err error) { refs = append(refs, segmentRef{s: fn, n: k}) last = k } + sort.Slice(refs, func(i, j int) bool { + return refs[i].n < refs[j].n + }) return refs, nil } @@ -667,10 +665,6 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) { // Only unset more so we don't invalidate the current segment and // offset before the next read. r.more = false - // If no more segments are left, it's the end for the reader. - if len(r.segs) == 0 { - return n, io.EOF - } return n, nil } @@ -689,7 +683,7 @@ func NewReader(r io.Reader) *Reader { } // Next advances the reader to the next records and returns true if it exists. -// It must not be called once after it returned false. +// It must not be called again after it returned false. func (r *Reader) Next() bool { err := r.next() if errors.Cause(err) == io.EOF { @@ -702,8 +696,8 @@ func (r *Reader) Next() bool { func (r *Reader) next() (err error) { // We have to use r.buf since allocating byte arrays here fails escape // analysis and ends up on the heap, even though it seemingly should not. - hdr := r.buf[:7] - buf := r.buf[7:] + hdr := r.buf[:recordHeaderSize] + buf := r.buf[recordHeaderSize:] r.rec = r.rec[:0] From 1a5573b4ce832c47fbe9eba0512de2cf4edb5ae6 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sun, 27 May 2018 13:05:11 -0400 Subject: [PATCH 10/17] Migrate write ahead log On startup, rewrite the old write ahead log into the new format once. Signed-off-by: Fabian Reinartz --- db.go | 4 +++ head.go | 2 ++ wal.go | 88 +++++++++++++++++++++++++++++++++++++++++++++++ wal_test.go | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 193 insertions(+) diff --git a/db.go b/db.go index 1b508e0666..e6a0a74b4b 100644 --- a/db.go +++ b/db.go @@ -192,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err := repairBadIndexVersion(l, dir); err != nil { return nil, err } + // Migrate old WAL. + if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil { + return nil, errors.Wrap(err, "migrate WAL") + } db = &DB{ dir: dir, diff --git a/head.go b/head.go index 61457911f1..f52fea726b 100644 --- a/head.go +++ b/head.go @@ -392,6 +392,8 @@ func (h *Head) Init() error { if err == nil { return nil } + level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) + if err := h.wal.Repair(err); err != nil { return errors.Wrap(err, "repair corrupted WAL") } diff --git a/wal.go b/wal.go index c1b9f6b062..773b642825 100644 --- a/wal.go +++ b/wal.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" ) // WALEntryType indicates what data a WAL entry contains. @@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. +// +// DEPRECATED: use wal pkg combined with the record coders instead. type WAL interface { Reader() WALReader LogSeries([]RefSeries) error @@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 { } // SegmentWAL is a write ahead log for series data. +// +// DEPRECATED: use wal pkg combined with the record coders instead. type SegmentWAL struct { mtx sync.Mutex metrics *walMetrics @@ -1206,3 +1211,86 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { } return nil } + +// MigrateWAL rewrites the deprecated write ahead log into the new format. +func MigrateWAL(logger log.Logger, dir string) error { + // Detect whether we still have the old WAL. + fns, err := sequenceFiles(dir) + if err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "list sequence files") + } + if len(fns) == 0 { + return nil // No WAL at all yet. + } + // Check header of first segment. + f, err := os.Open(fns[0]) + if err != nil { + return errors.Wrap(err, "check first existing segment") + } + var hdr [4]byte + if n, err := f.Read(hdr[:]); err != nil { + return errors.Wrap(err, "read header from first segment") + } else if n != 4 { + return errors.New("could not read full header from segment") + } + if binary.BigEndian.Uint32(hdr[:]) != WALMagic { + return nil // Not the old WAL anymore. + } + + level.Info(logger).Log("msg", "migrating WAL format") + + tmpdir := dir + ".tmp" + if err := os.RemoveAll(tmpdir); err != nil { + return errors.Wrap(err, "cleanup replacement dir") + } + repl, err := wal.New(logger, nil, tmpdir) + if err != nil { + return errors.Wrap(err, "open new WAL") + } + w, err := OpenSegmentWAL(dir, logger, time.Minute, nil) + if err != nil { + return errors.Wrap(err, "open old WAL") + } + rdr := w.Reader() + + var ( + enc RecordEncoder + b []byte + ) + decErr := rdr.Read( + func(s []RefSeries) { + if err != nil { + return + } + err = repl.Log(enc.Series(s, b[:0])) + }, + func(s []RefSample) { + if err != nil { + return + } + err = repl.Log(enc.Samples(s, b[:0])) + }, + func(s []Stone) { + if err != nil { + return + } + err = repl.Log(enc.Tombstones(s, b[:0])) + }, + ) + if decErr != nil { + return errors.Wrap(err, "decode old entries") + } + if err != nil { + return errors.Wrap(err, "write new entries") + } + if err := w.Close(); err != nil { + return errors.Wrap(err, "close old WAL") + } + if err := repl.Close(); err != nil { + return errors.Wrap(err, "close new WAL") + } + if err := fileutil.Rename(tmpdir, dir); err != nil { + return errors.Wrap(err, "replace old WAL") + } + return nil +} diff --git a/wal_test.go b/wal_test.go index 6d559f5d9d..680ebe06ab 100644 --- a/wal_test.go +++ b/wal_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "math/rand" "os" + "path" "testing" "time" @@ -26,6 +27,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func TestSegmentWAL_cut(t *testing.T) { @@ -431,3 +433,100 @@ func TestWALRestoreCorrupted(t *testing.T) { }) } } + +func TestMigrateWAL_Fuzz(t *testing.T) { + dir, err := ioutil.TempDir("", "walmigrate") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + + // Should pass if no WAL exists yet. + testutil.Ok(t, MigrateWAL(nil, wdir)) + + oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil) + testutil.Ok(t, err) + + // Write some data. + testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, + {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, + })) + testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + {Ref: 1, T: 100, V: 200}, + {Ref: 2, T: 300, V: 400}, + })) + testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, + })) + testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + {Ref: 3, T: 100, V: 200}, + {Ref: 4, T: 300, V: 400}, + })) + testutil.Ok(t, oldWAL.LogDeletes([]Stone{ + {ref: 1, intervals: []Interval{{100, 200}}}, + })) + + testutil.Ok(t, oldWAL.Close()) + + // Perform migration. + testutil.Ok(t, MigrateWAL(nil, wdir)) + + w, err := wal.New(nil, nil, wdir) + testutil.Ok(t, err) + + // We can properly write some new data after migration. + var enc RecordEncoder + testutil.Ok(t, w.Log(enc.Samples([]RefSample{ + {Ref: 500, T: 1, V: 1}, + }, nil))) + + testutil.Ok(t, w.Close()) + + // Read back all data. + sr, err := wal.NewSegmentsReader(wdir) + testutil.Ok(t, err) + + r := wal.NewReader(sr) + var res []interface{} + var dec RecordDecoder + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + s, err := dec.Series(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + case RecordSamples: + s, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + case RecordTombstones: + s, err := dec.Tombstones(rec, nil) + testutil.Ok(t, err) + res = append(res, s) + default: + t.Fatalf("unknown record type %d", dec.Type(rec)) + } + } + testutil.Ok(t, r.Err()) + + testutil.Equals(t, []interface{}{ + []RefSeries{ + {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, + {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, + }, + []RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}}, + []RefSeries{ + {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, + }, + []RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}}, + []Stone{{ref: 1, intervals: []Interval{{100, 200}}}}, + []RefSample{{Ref: 500, T: 1, V: 1}}, + }, res) + + // Migrating an already migrated WAL shouldn't do anything. + testutil.Ok(t, MigrateWAL(nil, wdir)) +} From 92e1b209575e98c8fe8edc3a70ea437beaae2f22 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 5 Jun 2018 04:21:27 -0400 Subject: [PATCH 11/17] Fix close handling Signed-off-by: Fabian Reinartz --- wal.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/wal.go b/wal.go index 773b642825..0fcadb1fc1 100644 --- a/wal.go +++ b/wal.go @@ -1213,7 +1213,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { } // MigrateWAL rewrites the deprecated write ahead log into the new format. -func MigrateWAL(logger log.Logger, dir string) error { +func MigrateWAL(logger log.Logger, dir string) (err error) { // Detect whether we still have the old WAL. fns, err := sequenceFiles(dir) if err != nil && !os.IsNotExist(err) { @@ -1227,6 +1227,8 @@ func MigrateWAL(logger log.Logger, dir string) error { if err != nil { return errors.Wrap(err, "check first existing segment") } + defer f.Close() + var hdr [4]byte if n, err := f.Read(hdr[:]); err != nil { return errors.Wrap(err, "read header from first segment") @@ -1247,10 +1249,20 @@ func MigrateWAL(logger log.Logger, dir string) error { if err != nil { return errors.Wrap(err, "open new WAL") } + // We close it once already before as part of finalization. + // Do it once again in case of prior errors. + defer func() { + if err != nil { + repl.Close() + } + }() + w, err := OpenSegmentWAL(dir, logger, time.Minute, nil) if err != nil { return errors.Wrap(err, "open old WAL") } + defer w.Close() + rdr := w.Reader() var ( @@ -1283,9 +1295,6 @@ func MigrateWAL(logger log.Logger, dir string) error { if err != nil { return errors.Wrap(err, "write new entries") } - if err := w.Close(); err != nil { - return errors.Wrap(err, "close old WAL") - } if err := repl.Close(); err != nil { return errors.Wrap(err, "close new WAL") } From 22fd3ef24e3bf9450fbeec574e46049d0b70e458 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 5 Jun 2018 04:50:20 -0400 Subject: [PATCH 12/17] Deal with zero-length segments Signed-off-by: Fabian Reinartz --- repair_test.go | 2 +- wal.go | 15 ++++++++++----- wal_test.go | 17 +++++++++++++++++ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/repair_test.go b/repair_test.go index f4c9d20874..c80976002a 100644 --- a/repair_test.go +++ b/repair_test.go @@ -76,7 +76,7 @@ func TestRepairBadIndexVersion(t *testing.T) { } // On DB opening all blocks in the base dir should be repaired. - db, _ := Open("testdata/repair_index_version", nil, nil, nil) + db, err := Open("testdata/repair_index_version", nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/wal.go b/wal.go index 0fcadb1fc1..6685dbd063 100644 --- a/wal.go +++ b/wal.go @@ -1214,6 +1214,9 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { // MigrateWAL rewrites the deprecated write ahead log into the new format. func MigrateWAL(logger log.Logger, dir string) (err error) { + if logger == nil { + logger = log.NewNopLogger() + } // Detect whether we still have the old WAL. fns, err := sequenceFiles(dir) if err != nil && !os.IsNotExist(err) { @@ -1222,7 +1225,8 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if len(fns) == 0 { return nil // No WAL at all yet. } - // Check header of first segment. + // Check header of first segment to see whether we are still dealing with an + // old WAL. f, err := os.Open(fns[0]) if err != nil { return errors.Wrap(err, "check first existing segment") @@ -1230,13 +1234,14 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { defer f.Close() var hdr [4]byte - if n, err := f.Read(hdr[:]); err != nil { + if _, err := f.Read(hdr[:]); err != nil && err != io.EOF { return errors.Wrap(err, "read header from first segment") - } else if n != 4 { - return errors.New("could not read full header from segment") } + // If we cannot read the magic header for segments of the old WAL, abort. + // Either it's migrated already or there's a corruption issue with which + // we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case. if binary.BigEndian.Uint32(hdr[:]) != WALMagic { - return nil // Not the old WAL anymore. + return nil } level.Info(logger).Log("msg", "migrating WAL format") diff --git a/wal_test.go b/wal_test.go index 680ebe06ab..b16680a994 100644 --- a/wal_test.go +++ b/wal_test.go @@ -434,6 +434,23 @@ func TestWALRestoreCorrupted(t *testing.T) { } } +func TestMigrateWAL_Empty(t *testing.T) { + // The migration proecedure must properly deal with a zero-length segment, + // which is valid in the new format. + dir, err := ioutil.TempDir("", "walmigrate") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + + // Initialize empty WAL. + w, err := wal.New(nil, nil, wdir) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + testutil.Ok(t, MigrateWAL(nil, wdir)) +} + func TestMigrateWAL_Fuzz(t *testing.T) { dir, err := ioutil.TempDir("", "walmigrate") testutil.Ok(t, err) From 45071c657c20ded6dcc6f3480489aaf47007c914 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 25 May 2018 17:19:32 -0400 Subject: [PATCH 13/17] Properly initialize head time This fixes various issues when initializing the head time range under different starting conditions. Signed-off-by: Fabian Reinartz --- db_test.go | 108 +++++++++++++++++++++++++++++++++++++++++++++++++++++ head.go | 81 +++++++++++++++++++++++----------------- 2 files changed, 155 insertions(+), 34 deletions(-) diff --git a/db_test.go b/db_test.go index 698bf78b00..9c175118eb 100644 --- a/db_test.go +++ b/db_test.go @@ -19,6 +19,7 @@ import ( "math" "math/rand" "os" + "path" "path/filepath" "sort" "testing" @@ -30,6 +31,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/wal" ) func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { @@ -1193,3 +1195,109 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { count := len(q.(*querier).blocks) testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count) } + +func TestInitializeHeadTimestamp(t *testing.T) { + t.Run("clean", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + // Should be set to init values if no WAL or blocks exist so far. + testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime()) + testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime()) + + // First added sample initializes the writable range. + app := db.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), 1000, 1) + testutil.Ok(t, err) + + testutil.Equals(t, int64(1000), db.head.MinTime()) + testutil.Equals(t, int64(1000), db.head.MaxTime()) + }) + t.Run("wal-only", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) + w, err := wal.New(nil, nil, path.Join(dir, "wal")) + testutil.Ok(t, err) + + var enc RecordEncoder + err = w.Log( + enc.Series([]RefSeries{ + {Ref: 123, Labels: labels.FromStrings("a", "1")}, + {Ref: 124, Labels: labels.FromStrings("a", "2")}, + }, nil), + enc.Samples([]RefSample{ + {Ref: 123, T: 5000, V: 1}, + {Ref: 124, T: 15000, V: 1}, + }, nil), + ) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + testutil.Equals(t, int64(5000), db.head.MinTime()) + testutil.Equals(t, int64(15000), db.head.MaxTime()) + }) + t.Run("existing-block", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + id := ulid.MustNew(2000, nil) + createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{ + ULID: id, + MinTime: 1000, + MaxTime: 2000, + }) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + testutil.Equals(t, int64(2000), db.head.MinTime()) + testutil.Equals(t, int64(2000), db.head.MaxTime()) + }) + t.Run("existing-block-and-wal", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_head_init") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + id := ulid.MustNew(2000, nil) + createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{ + ULID: id, + MinTime: 1000, + MaxTime: 6000, + }) + + testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) + w, err := wal.New(nil, nil, path.Join(dir, "wal")) + testutil.Ok(t, err) + + var enc RecordEncoder + err = w.Log( + enc.Series([]RefSeries{ + {Ref: 123, Labels: labels.FromStrings("a", "1")}, + {Ref: 124, Labels: labels.FromStrings("a", "2")}, + }, nil), + enc.Samples([]RefSample{ + {Ref: 123, T: 5000, V: 1}, + {Ref: 124, T: 15000, V: 1}, + }, nil), + ) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) + + db, err := Open(dir, nil, nil, nil) + testutil.Ok(t, err) + + testutil.Equals(t, int64(6000), db.head.MinTime()) + testutil.Equals(t, int64(15000), db.head.MaxTime()) + }) +} diff --git a/head.go b/head.go index 61457911f1..172d36e87a 100644 --- a/head.go +++ b/head.go @@ -183,7 +183,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int wal: wal, logger: l, chunkRange: chunkRange, - minTime: math.MinInt64, + minTime: math.MaxInt64, maxTime: math.MinInt64, series: newStripeSeries(), values: map[string]stringset{}, @@ -200,17 +200,17 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int // them on to other workers. // Samples before the mint timestamp are discarded. func (h *Head) processWALSamples( - mint int64, + minValidTime int64, partition, total uint64, input <-chan []RefSample, output chan<- []RefSample, ) (unknownRefs uint64) { defer close(output) - maxt := h.MaxTime() + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for samples := range input { for _, s := range samples { - if s.T < mint || s.Ref%total != partition { + if s.T < minValidTime || s.Ref%total != partition { continue } ms := h.series.getByID(s.Ref) @@ -226,10 +226,27 @@ func (h *Head) processWALSamples( if s.T > maxt { maxt = s.T } + if s.T < mint { + mint = s.T + } } output <- samples } + h.updateMinMaxTime(mint, maxt) + return unknownRefs +} + +func (h *Head) updateMinMaxTime(mint, maxt int64) { + for { + lt := h.MinTime() + if mint >= lt { + break + } + if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) { + break + } + } for { ht := h.MaxTime() if maxt <= ht { @@ -239,12 +256,15 @@ func (h *Head) processWALSamples( break } } - - return unknownRefs } func (h *Head) loadWAL(r *wal.Reader) error { - mint := h.MinTime() + minValidTime := h.MinTime() + // If the min time is still uninitialized (no persisted blocks yet), + // we accept all sample timestamps from the WAL. + if minValidTime == math.MaxInt64 { + minValidTime = math.MinInt64 + } // Track number of samples that referenced a series we don't know about // for error reporting. @@ -265,7 +285,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { output := make(chan []RefSample, 300) go func(i int, input <-chan []RefSample, output chan<- []RefSample) { - unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output) + unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() }(i, input, output) @@ -327,7 +347,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { } for _, s := range tstones { for _, itv := range s.intervals { - if itv.Maxt < mint { + if itv.Maxt < minValidTime { continue } h.tombstones.addInterval(s.ref, itv) @@ -400,9 +420,9 @@ func (h *Head) Init() error { // Truncate removes old data before mint from the head. func (h *Head) Truncate(mint int64) error { - initialize := h.MinTime() == math.MinInt64 + initialize := h.MinTime() == math.MaxInt64 - if h.MinTime() >= mint { + if h.MinTime() >= mint && !initialize { return nil } atomic.StoreInt64(&h.minTime, mint) @@ -462,10 +482,7 @@ func (h *Head) Truncate(mint int64) error { // for a compltely fresh head with an empty WAL. // Returns true if the initialization took an effect. func (h *Head) initTime(t int64) (initialized bool) { - // In the init state, the head has a high timestamp of math.MinInt64. - mint, _ := rangeForTimestamp(t, h.chunkRange) - - if !atomic.CompareAndSwapInt64(&h.minTime, math.MinInt64, mint) { + if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) { return false } // Ensure that max time is initialized to at least the min time we just set. @@ -536,7 +553,7 @@ func (h *Head) Appender() Appender { // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. - if h.MinTime() == math.MinInt64 { + if h.MinTime() == math.MaxInt64 { return &initAppender{head: h} } return h.appender() @@ -544,10 +561,11 @@ func (h *Head) Appender() Appender { func (h *Head) appender() *headAppender { return &headAppender{ - head: h, - mint: h.MaxTime() - h.chunkRange/2, - maxt: math.MinInt64, - samples: h.getAppendBuffer(), + head: h, + minValidTime: h.MaxTime() - h.chunkRange/2, + mint: math.MaxInt64, + maxt: math.MinInt64, + samples: h.getAppendBuffer(), } } @@ -576,15 +594,16 @@ func (h *Head) putBytesBuffer(b []byte) { } type headAppender struct { - head *Head - mint, maxt int64 + head *Head + minValidTime int64 // No samples below this timestamp are allowed. + mint, maxt int64 series []RefSeries samples []RefSample } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - if t < a.mint { + if t < a.minValidTime { return 0, ErrOutOfBounds } @@ -611,9 +630,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if err != nil { return err } - if t < a.mint { + if t < a.minValidTime { return ErrOutOfBounds } + if t < a.mint { + a.mint = t + } if t > a.maxt { a.maxt = t } @@ -682,16 +704,7 @@ func (a *headAppender) Commit() error { } a.head.metrics.samplesAppended.Add(float64(total)) - - for { - ht := a.head.MaxTime() - if a.maxt <= ht { - break - } - if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.maxt) { - break - } - } + a.head.updateMinMaxTime(a.mint, a.maxt) return nil } From b81e0fbf2afe1ccdf46e27f7ca8c4f662218ced0 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 20 Jul 2018 02:26:12 -0400 Subject: [PATCH 14/17] Address comments Signed-off-by: Fabian Reinartz --- docs/format/wal.md | 12 ++++++++++-- fileutil/fileutil.go | 3 --- wal/wal.go | 2 +- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/docs/format/wal.md b/docs/format/wal.md index ca3fae39de..6127fd0508 100644 --- a/docs/format/wal.md +++ b/docs/format/wal.md @@ -1,6 +1,6 @@ # WAL Disk Format -The write ahead log operates in segments that that are numbered and sequential, +The write ahead log operates in segments that are numbered and sequential, e.g. `000000`, `000001`, `000002`, etc., and are limited to 128MB by default. A segment is written to in pages of 32KB. Only the last page of the most recent segment may be partial. A WAL record is an opaque byte slice that gets split up into sub-records @@ -17,13 +17,21 @@ Notable deviations are that the record fragment is encoded as: └───────────┴──────────┴────────────┴──────────────┘ ``` +The type flag has the following states: + +* `0`: rest of page will be empty +* `1`: a full record encoded in a single fragment +* `2`: first fragment of a record +* `3`: middle fragment of a record +* `4`: final fragment of a record + ## Record encoding The records written to the write ahead log are encoded as follows: ### Series records -Series records encode the labels that identifier a series and its unique ID. +Series records encode the labels that identifies a series and its unique ID. ``` ┌────────────────────────────────────────────┐ diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index 397858958b..a3eb2a7ac4 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -27,9 +27,6 @@ func ReadDir(dirpath string) ([]string, error) { // Rename safely renames a file. func Rename(from, to string) error { - if err := os.RemoveAll(to); err != nil { - return err - } if err := os.Rename(from, to); err != nil { return err } diff --git a/wal/wal.go b/wal/wal.go index 90228184ab..c2b333da00 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -140,7 +140,7 @@ func OpenReadSegment(fn string) (*Segment, error) { // WAL is a write ahead log that stores records in segment files. // It must be read from start to end once before logging new data. -// If an erroe occurs during read, the repair procedure must be called +// If an error occurs during read, the repair procedure must be called // before it's safe to do further writes. // // Segments are written to in pages of 32KB, with records possibly split From f8ec0074e70d0c31b54e23ded4e6d67e0b47acb7 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 2 Aug 2018 17:46:45 -0400 Subject: [PATCH 15/17] Add Replace function Signed-off-by: Fabian Reinartz --- checkpoint.go | 6 +++--- docs/format/wal.md | 4 +--- fileutil/fileutil.go | 23 +++++++++++++++++++++++ wal/wal.go | 16 ++++++++-------- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index 87ff5597e3..d988d35615 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -37,7 +37,7 @@ type CheckpointStats struct { DroppedTombstones int TotalSeries int // Processed series including dropped ones. TotalSamples int // Processed samples inlcuding dropped ones. - TotalTombstones int // Processed tombstones including droppes ones. + TotalTombstones int // Processed tombstones including dropped ones. } // LastCheckpoint returns the directory name of the most recent checkpoint. @@ -260,8 +260,8 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo if err := cp.Close(); err != nil { return nil, errors.Wrap(err, "close checkpoint") } - if err := fileutil.Rename(cpdirtmp, cpdir); err != nil { - return nil, errors.Wrap(err, "rename checkpoint file") + if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { + return nil, errors.Wrap(err, "rename checkpoint directory") } if err := w.Truncate(n + 1); err != nil { // If truncating fails, we'll just try again at the next checkpoint. diff --git a/docs/format/wal.md b/docs/format/wal.md index 6127fd0508..7195e0bdfb 100644 --- a/docs/format/wal.md +++ b/docs/format/wal.md @@ -7,7 +7,7 @@ may be partial. A WAL record is an opaque byte slice that gets split up into sub should it exceed the remaining space of the current page. Records are never split across segment boundaries. If a single record exceeds the default segment size, a segment with a larger size will be created. -The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.][1] +The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.](https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format) Notable deviations are that the record fragment is encoded as: @@ -84,5 +84,3 @@ and specify an interval for which samples of a series got deleted. │ . . . │ └─────────────────────────────────────────────────────┘ ``` - -[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format] diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index a3eb2a7ac4..2158bfd265 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -43,3 +43,26 @@ func Rename(from, to string) error { } return pdir.Close() } + +// Replace moves a file or directory to a new location and deletes any previous data. +// It is not atomic. +func Replace(from, to string) error { + if err := os.RemoveAll(to); err != nil { + return nil + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = Fsync(pdir); err != nil { + pdir.Close() + return err + } + return pdir.Close() +} diff --git a/wal/wal.go b/wal/wal.go index c2b333da00..e59b0e15d0 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -393,11 +393,11 @@ func (w *WAL) flushPage(clear bool) error { type recType uint8 const ( - recPageTerm recType = 0 // rest of page is empty - recFull recType = 1 // full record - recFirst recType = 2 // first fragment of a record - recMiddle recType = 3 // middle fragments of a record - recLast recType = 4 // final fragment of a record + recPageTerm recType = 0 // Rest of page is empty. + recFull recType = 1 // Full record. + recFirst recType = 2 // First fragment of a record. + recMiddle recType = 3 // Middle fragments of a record. + recLast recType = 4 // Final fragment of a record. ) func (t recType) String() string { @@ -442,8 +442,8 @@ func (w *WAL) log(rec []byte, final bool) error { // If the record is too big to fit within pages in the current // segment, terminate the active segment and advance to the next one. // This ensures that records do not cross segment boundaries. - left := w.page.remaining() - recordHeaderSize // active page - left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // free pages + left := w.page.remaining() - recordHeaderSize // Active pages. + left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages. if len(rec) > left { if err := w.nextSegment(); err != nil { @@ -716,7 +716,7 @@ func (r *Reader) next() (err error) { // It's not strictly necessary but may catch sketchy state early. k := pageSize - (r.total % pageSize) if k == pageSize { - continue // initial 0 byte was last page byte + continue // Initial 0 byte was last page byte. } n, err := io.ReadFull(r.rdr, buf[:k]) if err != nil { From ee7ee059efb573a1c3f850a1c2f7a43556e98ea8 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 2 Aug 2018 17:57:34 -0400 Subject: [PATCH 16/17] Fix doc comments Signed-off-by: Fabian Reinartz --- wal.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wal.go b/wal.go index 6685dbd063..26857a9dc0 100644 --- a/wal.go +++ b/wal.go @@ -84,7 +84,7 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. // -// DEPRECATED: use wal pkg combined with the record coders instead. +// DEPRECATED: use wal pkg combined with the record codex instead. type WAL interface { Reader() WALReader LogSeries([]RefSeries) error @@ -1254,7 +1254,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err != nil { return errors.Wrap(err, "open new WAL") } - // We close it once already before as part of finalization. + // It should've already been closed as part of the previous finalization. // Do it once again in case of prior errors. defer func() { if err != nil { From 74b350116136896efc47a239ef18fd68d16a2a18 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 3 Aug 2018 11:25:27 -0400 Subject: [PATCH 17/17] Fix Rename call Signed-off-by: Fabian Reinartz --- wal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wal.go b/wal.go index 26857a9dc0..972fdea38c 100644 --- a/wal.go +++ b/wal.go @@ -1303,7 +1303,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err := repl.Close(); err != nil { return errors.Wrap(err, "close new WAL") } - if err := fileutil.Rename(tmpdir, dir); err != nil { + if err := fileutil.Replace(tmpdir, dir); err != nil { return errors.Wrap(err, "replace old WAL") } return nil