diff --git a/db.go b/db.go index 3d990cdbf..5ef35704c 100644 --- a/db.go +++ b/db.go @@ -41,7 +41,7 @@ type DB struct { // TODO(fabxc): make configurable const ( - shardShift = 3 + shardShift = 4 numShards = 1 << shardShift maxChunkSize = 1024 ) diff --git a/head.go b/head.go index afd9f39aa..b7a3b8432 100644 --- a/head.go +++ b/head.go @@ -31,7 +31,7 @@ type HeadBlock struct { // NewHeadBlock creates a new empty head block. func NewHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { - wal, err := CreateWAL(dir) + wal, err := OpenWAL(dir) if err != nil { return nil, err } @@ -45,6 +45,21 @@ func NewHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { } b.stats.MinTime = baseTime + err = wal.ReadAll(&walHandler{ + series: func(lset labels.Labels) { + b.create(lset.Hash(), lset) + }, + sample: func(s hashedSample) { + if err := b.descs[s.ref].append(s.t, s.v); err != nil { + panic(err) // TODO(fabxc): cannot actually error + } + b.stats.SampleCount++ + }, + }) + if err != nil { + return nil, err + } + return b, nil } diff --git a/wal.go b/wal.go index 0d3cbda97..c3b128b0f 100644 --- a/wal.go +++ b/wal.go @@ -10,6 +10,7 @@ import ( "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" + "github.com/pkg/errors" ) // WALEntryType indicates what data a WAL entry contains. @@ -31,20 +32,28 @@ type WAL struct { symbols map[string]uint32 } -// CreateWAL creates a new write ahead log in the given directory. -func CreateWAL(dir string) (*WAL, error) { +// OpenWAL opens or creates a write ahead log in the given directory. +// The WAL must be read completely before new data is written. +func OpenWAL(dir string) (*WAL, error) { if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } p := filepath.Join(dir, "wal") - f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode) + f, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode) if err != nil { - return nil, err - } - if _, err = f.Seek(0, os.SEEK_END); err != nil { - return nil, err + if !os.IsNotExist(err) { + return nil, err + } + + f, err = fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode) + if err != nil { + return nil, err + } + if _, err = f.Seek(0, os.SEEK_END); err != nil { + return nil, err + } } w := &WAL{ @@ -55,6 +64,27 @@ func CreateWAL(dir string) (*WAL, error) { return w, nil } +type walHandler struct { + sample func(hashedSample) + series func(labels.Labels) +} + +func (w *WAL) ReadAll(h *walHandler) error { + dec := &walDecoder{ + r: w.f, + handler: h, + } + + for { + if err := dec.entry(); err != nil { + if err == io.EOF { + return nil + } + return err + } + } +} + // Log writes a batch of new series labels and samples to the log. func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error { if err := w.enc.encodeSeries(series); err != nil { @@ -78,11 +108,6 @@ func (w *WAL) Close() error { return w.f.Close() } -// OpenWAL does things. -func OpenWAL(dir string) (*WAL, error) { - return nil, nil -} - type walEncoder struct { w io.Writer @@ -92,7 +117,7 @@ type walEncoder struct { func newWALEncoder(w io.Writer) *walEncoder { return &walEncoder{ w: w, - buf: make([]byte, 1024*1024), + buf: make([]byte, 0, 1024*1024), } } @@ -116,6 +141,8 @@ func (e *walEncoder) entry(et WALEntryType, flag byte, n int) error { return err } + e.buf = e.buf[:0] + return nil } @@ -128,35 +155,33 @@ func (e *walEncoder) encodeSeries(series []labels.Labels) error { if len(series) == 0 { return nil } - var ( - b = make([]byte, binary.MaxVarintLen32) - buf = e.buf[:0] - ) + + b := make([]byte, binary.MaxVarintLen32) for _, lset := range series { n := binary.PutUvarint(b, uint64(len(lset))) - buf = append(buf, b[:n]...) + e.buf = append(e.buf, b[:n]...) for _, l := range lset { n = binary.PutUvarint(b, uint64(len(l.Name))) - buf = append(buf, b[:n]...) + e.buf = append(e.buf, b[:n]...) + e.buf = append(e.buf, l.Name...) n = binary.PutUvarint(b, uint64(len(l.Value))) - buf = append(buf, b[:n]...) + e.buf = append(e.buf, b[:n]...) + e.buf = append(e.buf, l.Value...) } } - return e.entry(WALEntrySeries, walSeriesSimple, len(buf)) + return e.entry(WALEntrySeries, walSeriesSimple, len(e.buf)) } func (e *walEncoder) encodeSamples(samples []hashedSample) error { if len(samples) == 0 { return nil } - var ( - b = make([]byte, binary.MaxVarintLen64) - buf = e.buf[:0] - ) + + b := make([]byte, binary.MaxVarintLen64) // Store base timestamp and base reference number of first sample. // All samples encode their timestamp and ref as delta to those. @@ -165,27 +190,139 @@ func (e *walEncoder) encodeSamples(samples []hashedSample) error { first := samples[0] binary.BigEndian.PutUint32(b, first.ref) - buf = append(buf, b[:4]...) + e.buf = append(e.buf, b[:4]...) binary.BigEndian.PutUint64(b, uint64(first.t)) - buf = append(buf, b[:8]...) + e.buf = append(e.buf, b[:8]...) for _, s := range samples { n := binary.PutVarint(b, int64(s.ref)-int64(first.ref)) - buf = append(buf, b[:n]...) + e.buf = append(e.buf, b[:n]...) n = binary.PutVarint(b, s.t-first.t) - buf = append(buf, b[:n]...) + e.buf = append(e.buf, b[:n]...) binary.BigEndian.PutUint64(b, math.Float64bits(s.v)) - buf = append(buf, b[:8]...) + e.buf = append(e.buf, b[:8]...) } - return e.entry(WALEntrySamples, walSamplesSimple, len(buf)) + return e.entry(WALEntrySamples, walSamplesSimple, len(e.buf)) } type walDecoder struct { - r io.Reader + r io.Reader + handler *walHandler - handleSeries func(labels.Labels) - handleSample func(hashedSample) + buf []byte +} + +func newWALDecoer(r io.Reader, h *walHandler) *walDecoder { + return &walDecoder{ + r: r, + handler: h, + buf: make([]byte, 0, 1024*1024), + } +} + +func (d *walDecoder) decodeSeries(flag byte, b []byte) error { + for len(b) > 0 { + l, n := binary.Uvarint(b) + if n < 1 { + return errors.Wrap(errInvalidSize, "number of labels") + } + b = b[n:] + lset := make(labels.Labels, l) + + for i := 0; i < int(l); i++ { + nl, n := binary.Uvarint(b) + if n < 1 || len(b) < n+int(nl) { + return errors.Wrap(errInvalidSize, "label name") + } + lset[i].Name = string(b[n : n+int(nl)]) + b = b[n+int(nl):] + + vl, n := binary.Uvarint(b) + if n < 1 || len(b) < n+int(vl) { + return errors.Wrap(errInvalidSize, "label value") + } + lset[i].Value = string(b[n : n+int(vl)]) + b = b[n+int(vl):] + } + + d.handler.series(lset) + } + return nil +} + +func (d *walDecoder) decodeSamples(flag byte, b []byte) error { + if len(b) < 12 { + return errors.Wrap(errInvalidSize, "header length") + } + var ( + baseRef = binary.BigEndian.Uint32(b) + baseTime = int64(binary.BigEndian.Uint64(b[4:])) + ) + b = b[12:] + + for len(b) > 0 { + var smpl hashedSample + + dref, n := binary.Varint(b) + if n < 1 { + return errors.Wrap(errInvalidSize, "sample ref delta") + } + b = b[n:] + smpl.ref = uint32(int64(baseRef) + dref) + + dtime, n := binary.Varint(b) + if n < 1 { + return errors.Wrap(errInvalidSize, "sample timestamp delta") + } + b = b[n:] + smpl.t = baseTime + dtime + + if len(b) < 8 { + return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) + } + smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) + b = b[8:] + + d.handler.sample(smpl) + } + return nil +} + +func (d *walDecoder) entry() error { + b := make([]byte, 6) + if _, err := d.r.Read(b); err != nil { + return err + } + + var ( + etype = WALEntryType(b[0]) + flag = b[1] + length = int(binary.BigEndian.Uint32(b[2:])) + ) + + if length > len(d.buf) { + d.buf = make([]byte, length) + } + buf := d.buf[:length] + + if _, err := d.r.Read(buf); err != nil { + return err + } + // Read away checksum. + // TODO(fabxc): verify it + if _, err := d.r.Read(b[:4]); err != nil { + return err + } + + switch etype { + case WALEntrySeries: + return d.decodeSeries(flag, buf) + case WALEntrySamples: + return d.decodeSamples(flag, buf) + } + + return errors.Errorf("unknown WAL entry type %q", etype) } diff --git a/wal_test.go b/wal_test.go new file mode 100644 index 000000000..8849c08a3 --- /dev/null +++ b/wal_test.go @@ -0,0 +1,234 @@ +package tsdb + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "testing" + + "github.com/fabxc/tsdb/labels" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func BenchmarkWALWrite(b *testing.B) { + d, err := ioutil.TempDir("", "wal_read_test") + require.NoError(b, err) + + defer func() { + require.NoError(b, os.RemoveAll(d)) + }() + + wal, err := OpenWAL(d) + require.NoError(b, err) + + f, err := os.Open("cmd/tsdb/testdata.1m") + require.NoError(b, err) + + series, err := readPrometheusLabels(f, b.N) + require.NoError(b, err) + + var ( + samples [][]hashedSample + ts int64 + ) + for i := 0; i < 300; i++ { + ts += int64(30000) + scrape := make([]hashedSample, 0, len(series)) + + for ref := range series { + scrape = append(scrape, hashedSample{ + ref: uint32(ref), + t: ts, + v: 12345788, + }) + } + samples = append(samples, scrape) + } + + b.ResetTimer() + + err = wal.Log(series, samples[0]) + require.NoError(b, err) + + for _, s := range samples[1:] { + err = wal.Log(nil, s) + require.NoError(b, err) + } + + require.NoError(b, wal.Close()) +} + +func BenchmarkWALRead(b *testing.B) { + f, err := os.Open("cmd/tsdb/testdata.1m") + require.NoError(b, err) + + series, err := readPrometheusLabels(f, 1000000) + require.NoError(b, err) + + b.Run("test", func(b *testing.B) { + bseries := series[:b.N] + + d, err := ioutil.TempDir("", "wal_read_test") + require.NoError(b, err) + + defer func() { + require.NoError(b, os.RemoveAll(d)) + }() + + wal, err := OpenWAL(d) + require.NoError(b, err) + + var ( + samples [][]hashedSample + ts int64 + ) + for i := 0; i < 300; i++ { + ts += int64(30000) + scrape := make([]hashedSample, 0, len(bseries)) + + for ref := range bseries { + scrape = append(scrape, hashedSample{ + ref: uint32(ref), + t: ts, + v: 12345788, + }) + } + samples = append(samples, scrape) + } + + err = wal.Log(bseries, samples[0]) + require.NoError(b, err) + + for _, s := range samples[1:] { + err = wal.Log(nil, s) + require.NoError(b, err) + } + + require.NoError(b, wal.Close()) + + b.ResetTimer() + + wal, err = OpenWAL(d) + require.NoError(b, err) + + var numSeries, numSamples int + + err = wal.ReadAll(&walHandler{ + series: func(lset labels.Labels) { numSeries++ }, + sample: func(smpl hashedSample) { numSamples++ }, + }) + require.NoError(b, err) + + stat, _ := wal.f.Stat() + fmt.Println("read series", numSeries, "read samples", numSamples, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024)) + }) +} + +func BenchmarkWALReadIntoHead(b *testing.B) { + f, err := os.Open("cmd/tsdb/testdata.1m") + require.NoError(b, err) + + series, err := readPrometheusLabels(f, 1000000) + require.NoError(b, err) + + b.Run("test", func(b *testing.B) { + bseries := series[:b.N] + + d, err := ioutil.TempDir("", "wal_read_test") + require.NoError(b, err) + + defer func() { + require.NoError(b, os.RemoveAll(d)) + }() + + wal, err := OpenWAL(d) + require.NoError(b, err) + + var ( + samples [][]hashedSample + ts int64 + ) + for i := 0; i < 300; i++ { + ts += int64(30000) + scrape := make([]hashedSample, 0, len(bseries)) + + for ref := range bseries { + scrape = append(scrape, hashedSample{ + ref: uint32(ref), + t: ts, + v: 12345788, + }) + } + samples = append(samples, scrape) + } + + err = wal.Log(bseries, samples[0]) + require.NoError(b, err) + + for _, s := range samples[1:] { + err = wal.Log(nil, s) + require.NoError(b, err) + } + + require.NoError(b, wal.Close()) + + b.ResetTimer() + + head, err := NewHeadBlock(d, 0) + require.NoError(b, err) + + stat, _ := head.wal.f.Stat() + fmt.Println("head block initialized from WAL") + fmt.Println("read series", head.stats.SeriesCount, "read samples", head.stats.SampleCount, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024)) + }) +} + +func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { + dec := expfmt.NewDecoder(r, expfmt.FmtProtoText) + + var mets []model.Metric + fps := map[model.Fingerprint]struct{}{} + var mf dto.MetricFamily + var dups int + + for i := 0; i < n; { + if err := dec.Decode(&mf); err != nil { + if err == io.EOF { + break + } + return nil, err + } + + for _, m := range mf.GetMetric() { + met := make(model.Metric, len(m.GetLabel())+1) + met["__name__"] = model.LabelValue(mf.GetName()) + + for _, l := range m.GetLabel() { + met[model.LabelName(l.GetName())] = model.LabelValue(l.GetValue()) + } + if _, ok := fps[met.Fingerprint()]; ok { + dups++ + } else { + mets = append(mets, met) + fps[met.Fingerprint()] = struct{}{} + } + i++ + } + } + + lbls := make([]labels.Labels, 0, n) + + for _, m := range mets[:n] { + lset := make(labels.Labels, 0, len(m)) + for k, v := range m { + lset = append(lset, labels.Label{Name: string(k), Value: string(v)}) + } + lbls = append(lbls, lset) + } + + return lbls, nil +}