diff --git a/wal.go b/wal.go index c64f130a2..fc08d7ce9 100644 --- a/wal.go +++ b/wal.go @@ -111,9 +111,6 @@ func (w *WAL) ReadAll(h *walHandler) error { // Log writes a batch of new series labels and samples to the log. func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error { - w.mtx.Lock() - defer w.mtx.Unlock() - if err := w.enc.encodeSeries(series); err != nil { return err } @@ -148,12 +145,9 @@ func (w *WAL) run(interval time.Duration) { case <-w.stopc: return case <-tick: - w.mtx.Lock() - if err := w.sync(); err != nil { w.logger.Log("msg", "sync failed", "err", err) } - w.mtx.Unlock() } } } @@ -170,9 +164,8 @@ func (w *WAL) Close() error { } type walEncoder struct { - w *ioutil.PageWriter - - buf []byte + mtx sync.Mutex + w *ioutil.PageWriter } const ( @@ -190,17 +183,22 @@ func newWALEncoder(f *os.File) (*walEncoder, error) { return nil, err } enc := &walEncoder{ - w: ioutil.NewPageWriter(f, walPageBytes, int(offset)), - buf: make([]byte, 0, 1024*1024), + w: ioutil.NewPageWriter(f, walPageBytes, int(offset)), } return enc, nil } func (e *walEncoder) flush() error { + e.mtx.Lock() + defer e.mtx.Unlock() + return e.w.Flush() } -func (e *walEncoder) entry(et WALEntryType, flag byte, n int) error { +func (e *walEncoder) entry(et WALEntryType, flag byte, buf []byte) error { + e.mtx.Lock() + defer e.mtx.Unlock() + h := crc32.NewIEEE() w := io.MultiWriter(h, e.w) @@ -208,20 +206,19 @@ func (e *walEncoder) entry(et WALEntryType, flag byte, n int) error { b[0] = byte(et) b[1] = flag - binary.BigEndian.PutUint32(b[2:], uint32(len(e.buf))) + binary.BigEndian.PutUint32(b[2:], uint32(len(buf))) if _, err := w.Write(b); err != nil { return err } - if _, err := w.Write(e.buf[:n]); err != nil { + if _, err := w.Write(buf); err != nil { return err } if _, err := e.w.Write(h.Sum(nil)); err != nil { return err } - e.buf = e.buf[:0] - + putWALBuffer(buf) return nil } @@ -230,29 +227,45 @@ const ( walSamplesSimple = 1 ) +var walBuffers = sync.Pool{} + +func getWALBuffer() []byte { + b := walBuffers.Get() + if b == nil { + return make([]byte, 0, 64*1024) + } + return b.([]byte) +} + +func putWALBuffer(b []byte) { + b = b[:0] + walBuffers.Put(b) +} + func (e *walEncoder) encodeSeries(series []labels.Labels) error { if len(series) == 0 { return nil } b := make([]byte, binary.MaxVarintLen32) + buf := getWALBuffer() for _, lset := range series { n := binary.PutUvarint(b, uint64(len(lset))) - e.buf = append(e.buf, b[:n]...) + buf = append(buf, b[:n]...) for _, l := range lset { n = binary.PutUvarint(b, uint64(len(l.Name))) - e.buf = append(e.buf, b[:n]...) - e.buf = append(e.buf, l.Name...) + buf = append(buf, b[:n]...) + buf = append(buf, l.Name...) n = binary.PutUvarint(b, uint64(len(l.Value))) - e.buf = append(e.buf, b[:n]...) - e.buf = append(e.buf, l.Value...) + buf = append(buf, b[:n]...) + buf = append(buf, l.Value...) } } - return e.entry(WALEntrySeries, walSeriesSimple, len(e.buf)) + return e.entry(WALEntrySeries, walSeriesSimple, buf) } func (e *walEncoder) encodeSamples(samples []hashedSample) error { @@ -261,6 +274,7 @@ func (e *walEncoder) encodeSamples(samples []hashedSample) error { } b := make([]byte, binary.MaxVarintLen64) + buf := getWALBuffer() // Store base timestamp and base reference number of first sample. // All samples encode their timestamp and ref as delta to those. @@ -269,22 +283,22 @@ func (e *walEncoder) encodeSamples(samples []hashedSample) error { first := samples[0] binary.BigEndian.PutUint32(b, first.ref) - e.buf = append(e.buf, b[:4]...) + buf = append(buf, b[:4]...) binary.BigEndian.PutUint64(b, uint64(first.t)) - e.buf = append(e.buf, b[:8]...) + buf = append(buf, b[:8]...) for _, s := range samples { n := binary.PutVarint(b, int64(s.ref)-int64(first.ref)) - e.buf = append(e.buf, b[:n]...) + buf = append(buf, b[:n]...) n = binary.PutVarint(b, s.t-first.t) - e.buf = append(e.buf, b[:n]...) + buf = append(buf, b[:n]...) binary.BigEndian.PutUint64(b, math.Float64bits(s.v)) - e.buf = append(e.buf, b[:8]...) + buf = append(buf, b[:8]...) } - return e.entry(WALEntrySamples, walSamplesSimple, len(e.buf)) + return e.entry(WALEntrySamples, walSamplesSimple, buf) } type walDecoder struct {