Merge pull request #1342 from prometheus/beorn7/storage

Streamline chunk writing
This commit is contained in:
Björn Rabenstein 2016-01-25 18:58:04 +01:00
commit 3214cb9529
4 changed files with 71 additions and 26 deletions

View File

@ -220,6 +220,7 @@ type chunk interface {
firstTime() model.Time firstTime() model.Time
newIterator() chunkIterator newIterator() chunkIterator
marshal(io.Writer) error marshal(io.Writer) error
marshalToBuf([]byte) error
unmarshal(io.Reader) error unmarshal(io.Reader) error
unmarshalFromBuf([]byte) unmarshalFromBuf([]byte)
encoding() chunkEncoding encoding() chunkEncoding

View File

@ -218,7 +218,21 @@ func (c deltaEncodedChunk) marshal(w io.Writer) error {
return err return err
} }
if n != cap(c) { if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
}
return nil
}
// marshalToBuf implements chunk.
func (c deltaEncodedChunk) marshalToBuf(buf []byte) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint")
}
binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c)))
n := copy(buf, c)
if n != len(c) {
return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n)
} }
return nil return nil
} }

View File

@ -217,7 +217,7 @@ func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
// marshal implements chunk. // marshal implements chunk.
func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error {
if len(c) > math.MaxUint16 { if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.") panic("chunk buffer length would overflow a 16 bit uint")
} }
binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c))) binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
@ -226,7 +226,21 @@ func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error {
return err return err
} }
if n != cap(c) { if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", len(c), n) return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
}
return nil
}
// marshalToBuf implements chunk.
func (c doubleDeltaEncodedChunk) marshalToBuf(buf []byte) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint")
}
binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
n := copy(buf, c)
if n != len(c) {
return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n)
} }
return nil return nil
} }

View File

@ -67,7 +67,9 @@ const (
chunkHeaderFirstTimeOffset = 1 chunkHeaderFirstTimeOffset = 1
chunkHeaderLastTimeOffset = 9 chunkHeaderLastTimeOffset = 9
chunkLenWithHeader = chunkLen + chunkHeaderLen chunkLenWithHeader = chunkLen + chunkHeaderLen
chunkMaxBatchSize = 64 // How many chunks to load at most in one batch. chunkMaxBatchSize = 62 // Max no. of chunks to load or write in
// one batch. Note that 62 is the largest number of chunks that fit
// into 64kiB on disk because chunkHeaderLen is added to each 1k chunk.
indexingMaxBatchSize = 1024 * 1024 indexingMaxBatchSize = 1024 * 1024
indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long.
@ -380,7 +382,7 @@ func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index
} }
defer p.closeChunkFile(f) defer p.closeChunkFile(f)
if err := writeChunks(f, chunks); err != nil { if err := p.writeChunks(f, chunks); err != nil {
return -1, err return -1, err
} }
@ -412,8 +414,8 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
chunks := make([]chunk, 0, len(indexes)) chunks := make([]chunk, 0, len(indexes))
buf := p.bufPool.Get().([]byte) buf := p.bufPool.Get().([]byte)
defer func() { defer func() {
// buf may change below, so wrap returning to the pool in a function. // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
// A simple 'defer p.bufPool.Put(buf)' would only return the original buf. // would only put back the original buf.
p.bufPool.Put(buf) p.bufPool.Put(buf)
}() }()
@ -1031,7 +1033,7 @@ func (p *persistence) dropAndPersistChunks(
offset = int(written / chunkLenWithHeader) offset = int(written / chunkLenWithHeader)
if len(chunks) > 0 { if len(chunks) > 0 {
if err = writeChunks(temp, chunks); err != nil { if err = p.writeChunks(temp, chunks); err != nil {
return return
} }
} }
@ -1585,6 +1587,37 @@ func (p *persistence) loadFPMappings() (fpMappings, model.Fingerprint, error) {
return fpm, highestMappedFP, nil return fpm, highestMappedFP, nil
} }
func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error {
b := p.bufPool.Get().([]byte)
defer func() {
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
// would only put back the original buf.
p.bufPool.Put(b)
}()
for batchSize := chunkMaxBatchSize; len(chunks) > 0; chunks = chunks[batchSize:] {
if batchSize > len(chunks) {
batchSize = len(chunks)
}
writeSize := batchSize * chunkLenWithHeader
if cap(b) < writeSize {
b = make([]byte, writeSize)
}
b = b[:writeSize]
for i, chunk := range chunks[:batchSize] {
writeChunkHeader(b[i*chunkLenWithHeader:], chunk)
if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return err
}
}
if _, err := w.Write(b); err != nil {
return err
}
}
return nil
}
func offsetForChunkIndex(i int) int64 { func offsetForChunkIndex(i int) int64 {
return int64(i * chunkLenWithHeader) return int64(i * chunkLenWithHeader)
} }
@ -1599,8 +1632,7 @@ func chunkIndexForOffset(offset int64) (int, error) {
return int(offset) / chunkLenWithHeader, nil return int(offset) / chunkLenWithHeader, nil
} }
func writeChunkHeader(w io.Writer, c chunk) error { func writeChunkHeader(header []byte, c chunk) {
header := make([]byte, chunkHeaderLen)
header[chunkHeaderTypeOffset] = byte(c.encoding()) header[chunkHeaderTypeOffset] = byte(c.encoding())
binary.LittleEndian.PutUint64( binary.LittleEndian.PutUint64(
header[chunkHeaderFirstTimeOffset:], header[chunkHeaderFirstTimeOffset:],
@ -1610,20 +1642,4 @@ func writeChunkHeader(w io.Writer, c chunk) error {
header[chunkHeaderLastTimeOffset:], header[chunkHeaderLastTimeOffset:],
uint64(c.newIterator().lastTimestamp()), uint64(c.newIterator().lastTimestamp()),
) )
_, err := w.Write(header)
return err
}
func writeChunks(w io.Writer, chunks []chunk) error {
b := bufio.NewWriterSize(w, len(chunks)*chunkLenWithHeader)
for _, chunk := range chunks {
if err := writeChunkHeader(b, chunk); err != nil {
return err
}
if err := chunk.marshal(b); err != nil {
return err
}
}
return b.Flush()
} }