From 582af1618cd64421e7dc530e9a3abfd6fbe3b9cf Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 25 Jan 2016 16:36:36 +0100 Subject: [PATCH] Streamline chunk writing This helps to avoid allocations in the same way we were already doing it during reading. --- storage/local/chunk.go | 1 + storage/local/delta.go | 16 +++++++++- storage/local/doubledelta.go | 18 +++++++++-- storage/local/persistence.go | 61 ++++++++++++++++++++++-------------- 4 files changed, 70 insertions(+), 26 deletions(-) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 831b7e2a2..226e8ebb2 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -220,6 +220,7 @@ type chunk interface { firstTime() model.Time newIterator() chunkIterator marshal(io.Writer) error + marshalToBuf([]byte) error unmarshal(io.Reader) error unmarshalFromBuf([]byte) encoding() chunkEncoding diff --git a/storage/local/delta.go b/storage/local/delta.go index 5db3df59a..5d069afd6 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -218,7 +218,21 @@ func (c deltaEncodedChunk) marshal(w io.Writer) error { return err } if n != cap(c) { - return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) + return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n) + } + return nil +} + +// marshalToBuf implements chunk. +func (c deltaEncodedChunk) marshalToBuf(buf []byte) error { + if len(c) > math.MaxUint16 { + panic("chunk buffer length would overflow a 16 bit uint") + } + binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c))) + + n := copy(buf, c) + if n != len(c) { + return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n) } return nil } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index d2a983689..eaf3093cb 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -217,7 +217,7 @@ func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { // marshal implements chunk. func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { if len(c) > math.MaxUint16 { - panic("chunk buffer length would overflow a 16 bit uint.") + panic("chunk buffer length would overflow a 16 bit uint") } binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c))) @@ -226,7 +226,21 @@ func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { return err } if n != cap(c) { - return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) + return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n) + } + return nil +} + +// marshalToBuf implements chunk. +func (c doubleDeltaEncodedChunk) marshalToBuf(buf []byte) error { + if len(c) > math.MaxUint16 { + panic("chunk buffer length would overflow a 16 bit uint") + } + binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c))) + + n := copy(buf, c) + if n != len(c) { + return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n) } return nil } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index ace58b6d9..efd0c59c0 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -67,7 +67,8 @@ const ( chunkHeaderFirstTimeOffset = 1 chunkHeaderLastTimeOffset = 9 chunkLenWithHeader = chunkLen + chunkHeaderLen - chunkMaxBatchSize = 64 // How many chunks to load at most in one batch. + chunkMaxBatchSize = 62 // Max no. of chunks to load or write in one batch. + // Note that 62 is the largest number of chunks that fit into 64kiB on disk. indexingMaxBatchSize = 1024 * 1024 indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. @@ -380,7 +381,7 @@ func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index } defer p.closeChunkFile(f) - if err := writeChunks(f, chunks); err != nil { + if err := p.writeChunks(f, chunks); err != nil { return -1, err } @@ -412,8 +413,8 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse chunks := make([]chunk, 0, len(indexes)) buf := p.bufPool.Get().([]byte) defer func() { - // buf may change below, so wrap returning to the pool in a function. - // A simple 'defer p.bufPool.Put(buf)' would only return the original buf. + // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' + // would only put back the original buf. p.bufPool.Put(buf) }() @@ -1031,7 +1032,7 @@ func (p *persistence) dropAndPersistChunks( offset = int(written / chunkLenWithHeader) if len(chunks) > 0 { - if err = writeChunks(temp, chunks); err != nil { + if err = p.writeChunks(temp, chunks); err != nil { return } } @@ -1585,6 +1586,37 @@ func (p *persistence) loadFPMappings() (fpMappings, model.Fingerprint, error) { return fpm, highestMappedFP, nil } +func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error { + b := p.bufPool.Get().([]byte) + defer func() { + // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' + // would only put back the original buf. + p.bufPool.Put(b) + }() + + for batchSize := chunkMaxBatchSize; len(chunks) > 0; chunks = chunks[batchSize:] { + if batchSize > len(chunks) { + batchSize = len(chunks) + } + writeSize := batchSize * chunkLenWithHeader + if cap(b) < writeSize { + b = make([]byte, writeSize) + } + b = b[:writeSize] + + for i, chunk := range chunks[:batchSize] { + writeChunkHeader(b[i*chunkLenWithHeader:], chunk) + if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil { + return err + } + } + if _, err := w.Write(b); err != nil { + return err + } + } + return nil +} + func offsetForChunkIndex(i int) int64 { return int64(i * chunkLenWithHeader) } @@ -1599,8 +1631,7 @@ func chunkIndexForOffset(offset int64) (int, error) { return int(offset) / chunkLenWithHeader, nil } -func writeChunkHeader(w io.Writer, c chunk) error { - header := make([]byte, chunkHeaderLen) +func writeChunkHeader(header []byte, c chunk) { header[chunkHeaderTypeOffset] = byte(c.encoding()) binary.LittleEndian.PutUint64( header[chunkHeaderFirstTimeOffset:], @@ -1610,20 +1641,4 @@ func writeChunkHeader(w io.Writer, c chunk) error { header[chunkHeaderLastTimeOffset:], uint64(c.newIterator().lastTimestamp()), ) - _, err := w.Write(header) - return err -} - -func writeChunks(w io.Writer, chunks []chunk) error { - b := bufio.NewWriterSize(w, len(chunks)*chunkLenWithHeader) - for _, chunk := range chunks { - if err := writeChunkHeader(b, chunk); err != nil { - return err - } - - if err := chunk.marshal(b); err != nil { - return err - } - } - return b.Flush() }