From 535532ca02ec4ac11a2ef5844525953598a7d866 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 12 May 2017 17:06:26 +0200 Subject: [PATCH] Export refdSample The type was part of a exported method signatures and should therefore be exported as well. --- head.go | 38 ++++++++++++---------------- wal.go | 71 ++++++++++++++++++++++++++++------------------------- wal_test.go | 34 ++++++++++++------------- 3 files changed, 71 insertions(+), 72 deletions(-) diff --git a/head.go b/head.go index b39385fb6..6e3dd21c4 100644 --- a/head.go +++ b/head.go @@ -131,13 +131,13 @@ Outer: h.meta.Stats.NumSeries++ } for _, s := range samples { - if int(s.ref) >= len(h.series) { - l.Log("msg", "unknown series reference, abort WAL restore", "got", s.ref, "max", len(h.series)-1) + if int(s.Ref) >= len(h.series) { + l.Log("msg", "unknown series reference, abort WAL restore", "got", s.Ref, "max", len(h.series)-1) break Outer } - h.series[s.ref].append(s.t, s.v) + h.series[s.Ref].append(s.T, s.V) - if !h.inBounds(s.t) { + if !h.inBounds(s.T) { return nil, errors.Wrap(ErrOutOfBounds, "consume WAL") } h.meta.Stats.NumSamples++ @@ -262,15 +262,15 @@ func (h *HeadBlock) Busy() bool { var headPool = sync.Pool{} -func getHeadAppendBuffer() []refdSample { +func getHeadAppendBuffer() []RefSample { b := headPool.Get() if b == nil { - return make([]refdSample, 0, 512) + return make([]RefSample, 0, 512) } - return b.([]refdSample) + return b.([]RefSample) } -func putHeadAppendBuffer(b []refdSample) { +func putHeadAppendBuffer(b []RefSample) { headPool.Put(b[:0]) } @@ -282,7 +282,7 @@ type headAppender struct { refmap map[uint64]uint64 newLabels []labels.Labels - samples []refdSample + samples []RefSample } type hashedLabels struct { @@ -290,12 +290,6 @@ type hashedLabels struct { labels labels.Labels } -type refdSample struct { - ref uint64 - t int64 - v float64 -} - func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if !a.inBounds(t) { return 0, ErrOutOfBounds @@ -370,10 +364,10 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { } } - a.samples = append(a.samples, refdSample{ - ref: ref, - t: t, - v: v, + a.samples = append(a.samples, RefSample{ + Ref: ref, + T: t, + V: v, }) return nil } @@ -419,8 +413,8 @@ func (a *headAppender) Commit() error { for i := range a.samples { s := &a.samples[i] - if s.ref&(1<<32) > 0 { - s.ref = a.refmap[s.ref] + if s.Ref&(1<<32) > 0 { + s.Ref = a.refmap[s.Ref] } } @@ -434,7 +428,7 @@ func (a *headAppender) Commit() error { total := uint64(len(a.samples)) for _, s := range a.samples { - if !a.series[s.ref].append(s.t, s.v) { + if !a.series[s.Ref].append(s.T, s.V) { total-- } } diff --git a/wal.go b/wal.go index 853065f69..80f3508e8 100644 --- a/wal.go +++ b/wal.go @@ -50,7 +50,7 @@ const ( ) // WAL is a write ahead log for series data. It can only be written to. -// Use WALReader to read back from a write ahead log. +// Use walReader to read back from a write ahead log. type WAL struct { mtx sync.Mutex @@ -69,6 +69,13 @@ type WAL struct { donec chan struct{} } +// RefSample is a timestamp/value pair associated with a reference to a series. +type RefSample struct { + Ref uint64 + T int64 + V float64 +} + const ( walDirName = "wal" walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB @@ -119,12 +126,12 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. -func (w *WAL) Reader() *WALReader { - return NewWALReader(w.logger, w) +func (w *WAL) Reader() WALReader { + return newWALReader(w, w.logger) } // Log writes a batch of new series labels and samples to the log. -func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { +func (w *WAL) Log(series []labels.Labels, samples []RefSample) error { if err := w.encodeSeries(series); err != nil { return err } @@ -395,7 +402,7 @@ func (w *WAL) encodeSeries(series []labels.Labels) error { return w.entry(WALEntrySeries, walSeriesSimple, buf) } -func (w *WAL) encodeSamples(samples []refdSample) error { +func (w *WAL) encodeSamples(samples []RefSample) error { if len(samples) == 0 { return nil } @@ -409,27 +416,27 @@ func (w *WAL) encodeSamples(samples []refdSample) error { // TODO(fabxc): optimize for all samples having the same timestamp. first := samples[0] - binary.BigEndian.PutUint64(b, first.ref) + binary.BigEndian.PutUint64(b, first.Ref) buf = append(buf, b[:8]...) - binary.BigEndian.PutUint64(b, uint64(first.t)) + binary.BigEndian.PutUint64(b, uint64(first.T)) buf = append(buf, b[:8]...) for _, s := range samples { - n := binary.PutVarint(b, int64(s.ref)-int64(first.ref)) + n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) buf = append(buf, b[:n]...) - n = binary.PutVarint(b, s.t-first.t) + n = binary.PutVarint(b, s.T-first.T) buf = append(buf, b[:n]...) - binary.BigEndian.PutUint64(b, math.Float64bits(s.v)) + binary.BigEndian.PutUint64(b, math.Float64bits(s.V)) buf = append(buf, b[:8]...) } return w.entry(WALEntrySamples, walSamplesSimple, buf) } -// WALReader decodes and emits write ahead log entries. -type WALReader struct { +// walReader decodes and emits write ahead log entries. +type walReader struct { logger log.Logger wal *WAL @@ -439,37 +446,35 @@ type WALReader struct { err error labels []labels.Labels - samples []refdSample + samples []RefSample } -// NewWALReader returns a new WALReader over the sequence of the given ReadClosers. -func NewWALReader(logger log.Logger, w *WAL) *WALReader { - if logger == nil { - logger = log.NewNopLogger() +func newWALReader(w *WAL, l log.Logger) *walReader { + if l == nil { + l = log.NewNopLogger() } - r := &WALReader{ - logger: logger, + return &walReader{ + logger: l, wal: w, buf: make([]byte, 0, 128*4096), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } - return r } // At returns the last decoded entry of labels or samples. // The returned slices are only valid until the next call to Next(). Their elements // have to be copied to preserve them. -func (r *WALReader) At() ([]labels.Labels, []refdSample) { +func (r *walReader) At() ([]labels.Labels, []RefSample) { return r.labels, r.samples } // Err returns the last error the reader encountered. -func (r *WALReader) Err() error { +func (r *walReader) Err() error { return r.err } // nextEntry retrieves the next entry. It is also used as a testing hook. -func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { +func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { if r.cur >= len(r.wal.files) { return 0, 0, nil, io.EOF } @@ -492,7 +497,7 @@ func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { // Next returns decodes the next entry pair and returns true // if it was succesful. -func (r *WALReader) Next() bool { +func (r *walReader) Next() bool { r.labels = r.labels[:0] r.samples = r.samples[:0] @@ -549,12 +554,12 @@ func (r *WALReader) Next() bool { return r.err == nil } -func (r *WALReader) current() *os.File { +func (r *walReader) current() *os.File { return r.wal.files[r.cur] } // truncate the WAL after the last valid entry. -func (r *WALReader) truncate(lastOffset int64) error { +func (r *walReader) truncate(lastOffset int64) error { r.logger.Log("msg", "WAL corruption detected; truncating", "err", r.err, "file", r.current().Name(), "pos", lastOffset) @@ -582,7 +587,7 @@ func walCorruptionErrf(s string, args ...interface{}) error { return walCorruptionErr(errors.Errorf(s, args...)) } -func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { +func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { r.crc32.Reset() tr := io.TeeReader(cr, r.crc32) @@ -629,7 +634,7 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *WALReader) decodeSeries(flag byte, b []byte) error { +func (r *walReader) decodeSeries(flag byte, b []byte) error { for len(b) > 0 { l, n := binary.Uvarint(b) if n < 1 { @@ -659,7 +664,7 @@ func (r *WALReader) decodeSeries(flag byte, b []byte) error { return nil } -func (r *WALReader) decodeSamples(flag byte, b []byte) error { +func (r *walReader) decodeSamples(flag byte, b []byte) error { if len(b) < 16 { return errors.Wrap(errInvalidSize, "header length") } @@ -670,7 +675,7 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error { b = b[16:] for len(b) > 0 { - var smpl refdSample + var smpl RefSample dref, n := binary.Varint(b) if n < 1 { @@ -678,19 +683,19 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error { } b = b[n:] - smpl.ref = uint64(int64(baseRef) + dref) + smpl.Ref = uint64(int64(baseRef) + dref) dtime, n := binary.Varint(b) if n < 1 { return errors.Wrap(errInvalidSize, "sample timestamp delta") } b = b[n:] - smpl.t = baseTime + dtime + smpl.T = baseTime + dtime if len(b) < 8 { return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) } - smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) + smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) b = b[8:] r.samples = append(r.samples, smpl) diff --git a/wal_test.go b/wal_test.go index 34508a1b4..99822ec01 100644 --- a/wal_test.go +++ b/wal_test.go @@ -122,7 +122,7 @@ func TestWAL_cut(t *testing.T) { // We cannot actually check for correct pre-allocation as it is // optional per filesystem and handled transparently. - et, flag, b, err := NewWALReader(nil, nil).entry(f) + et, flag, b, err := newWALReader(nil, nil).entry(f) require.NoError(t, err) require.Equal(t, WALEntrySeries, et) require.Equal(t, flag, byte(walSeriesSimple)) @@ -148,7 +148,7 @@ func TestWAL_Log_Restore(t *testing.T) { var ( recordedSeries [][]labels.Labels - recordedSamples [][]refdSample + recordedSamples [][]RefSample ) var totalSamples int @@ -165,7 +165,7 @@ func TestWAL_Log_Restore(t *testing.T) { var ( resultSeries [][]labels.Labels - resultSamples [][]refdSample + resultSamples [][]RefSample ) for r.Next() { @@ -177,7 +177,7 @@ func TestWAL_Log_Restore(t *testing.T) { resultSeries = append(resultSeries, clsets) } if len(smpls) > 0 { - csmpls := make([]refdSample, len(smpls)) + csmpls := make([]RefSample, len(smpls)) copy(csmpls, smpls) resultSamples = append(resultSamples, csmpls) } @@ -191,13 +191,13 @@ func TestWAL_Log_Restore(t *testing.T) { // Insert in batches and generate different amounts of samples for each. for i := 0; i < len(series); i += stepSize { - var samples []refdSample + var samples []RefSample for j := 0; j < i*10; j++ { - samples = append(samples, refdSample{ - ref: uint64(j % 10000), - t: int64(j * 2), - v: rand.Float64(), + samples = append(samples, RefSample{ + Ref: uint64(j % 10000), + T: int64(j * 2), + V: rand.Float64(), }) } @@ -292,13 +292,13 @@ func TestWALRestoreCorrupted(t *testing.T) { w, err := OpenWAL(dir, nil, 0) require.NoError(t, err) - require.NoError(t, w.Log(nil, []refdSample{{t: 1, v: 2}})) - require.NoError(t, w.Log(nil, []refdSample{{t: 2, v: 3}})) + require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}})) + require.NoError(t, w.Log(nil, []RefSample{{T: 2, V: 3}})) require.NoError(t, w.cut()) - require.NoError(t, w.Log(nil, []refdSample{{t: 3, v: 4}})) - require.NoError(t, w.Log(nil, []refdSample{{t: 5, v: 6}})) + require.NoError(t, w.Log(nil, []RefSample{{T: 3, V: 4}})) + require.NoError(t, w.Log(nil, []RefSample{{T: 5, V: 6}})) require.NoError(t, w.Close()) @@ -318,13 +318,13 @@ func TestWALRestoreCorrupted(t *testing.T) { require.True(t, r.Next()) l, s := r.At() require.Equal(t, 0, len(l)) - require.Equal(t, []refdSample{{t: 1, v: 2}}, s) + require.Equal(t, []RefSample{{T: 1, V: 2}}, s) // Truncation should happen transparently and now cause an error. require.False(t, r.Next()) require.Nil(t, r.Err()) - require.NoError(t, w2.Log(nil, []refdSample{{t: 99, v: 100}})) + require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}})) require.NoError(t, w2.Close()) files, err := fileutil.ReadDir(dir) @@ -341,12 +341,12 @@ func TestWALRestoreCorrupted(t *testing.T) { require.True(t, r.Next()) l, s = r.At() require.Equal(t, 0, len(l)) - require.Equal(t, []refdSample{{t: 1, v: 2}}, s) + require.Equal(t, []RefSample{{T: 1, V: 2}}, s) require.True(t, r.Next()) l, s = r.At() require.Equal(t, 0, len(l)) - require.Equal(t, []refdSample{{t: 99, v: 100}}, s) + require.Equal(t, []RefSample{{T: 99, V: 100}}, s) require.False(t, r.Next()) require.Nil(t, r.Err())