From d578b10d5cbcfd481b3c9cb82e028aa4f649a470 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 10 Nov 2017 10:38:22 +0000 Subject: [PATCH] chunk: make reader accept abstract ByteSlice --- block.go | 2 +- chunks.go | 64 +++++++++++++++++++++++++++++++++++-------------------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/block.go b/block.go index 2d7bab2d8..034f33673 100644 --- a/block.go +++ b/block.go @@ -156,7 +156,7 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { return nil, err } - cr, err := newChunkReader(chunkDir(dir), pool) + cr, err := NewDirChunkReader(chunkDir(dir), pool) if err != nil { return nil, err } diff --git a/chunks.go b/chunks.go index f6e329b79..bab0a0bad 100644 --- a/chunks.go +++ b/chunks.go @@ -298,7 +298,7 @@ type ChunkReader interface { // of series data. type chunkReader struct { // The underlying bytes holding the encoded series data. - bs [][]byte + bs []ByteSlice // Closers for resources behind the byte slices. cs []io.Closer @@ -306,8 +306,32 @@ type chunkReader struct { pool chunks.Pool } -// newChunkReader returns a new chunkReader based on mmaped files found in dir. -func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { +func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) { + cr := chunkReader{pool: pool, bs: bs, cs: cs} + + for i, b := range cr.bs { + if b.Len() < 4 { + return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) + } + // Verify magic number. + if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { + return nil, fmt.Errorf("invalid magic number %x", m) + } + } + return &cr, nil +} + +// NewChunkReader returns a new chunk reader against the given byte slices. +func NewChunkReader(bs []ByteSlice, pool chunks.Pool) ChunkReader { + if pool == nil { + pool = chunks.NewPool() + } + return &chunkReader{bs: bs, pool: pool} +} + +// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the +// given directory. +func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) { files, err := sequenceFiles(dir) if err != nil { return nil, err @@ -315,27 +339,19 @@ func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { if pool == nil { pool = chunks.NewPool() } - cr := chunkReader{pool: pool} + + var bs []ByteSlice + var cs []io.Closer for _, fn := range files { f, err := openMmapFile(fn) if err != nil { return nil, errors.Wrapf(err, "mmap files") } - cr.cs = append(cr.cs, f) - cr.bs = append(cr.bs, f.b) + cs = append(cs, f) + bs = append(bs, realByteSlice(f.b)) } - - for i, b := range cr.bs { - if len(b) < 4 { - return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks { - return nil, fmt.Errorf("invalid magic number %x", m) - } - } - return &cr, nil + return newChunkReader(bs, cs, pool) } func (s *chunkReader) Close() error { @@ -352,16 +368,18 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { } b := s.bs[seq] - if int(off) >= len(b) { - return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) + if int(off) >= b.Len() { + return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len()) } - b = b[off:] + // With the minimum chunk length this should never cause us reading + // over the end of the slice. + r := b.Range(off, off+binary.MaxVarintLen32) - l, n := binary.Uvarint(b) + l, n := binary.Uvarint(r) if n < 0 { return nil, fmt.Errorf("reading chunk length failed") } - b = b[n:] + r = b.Range(off+n, off+n+int(l)) - return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l]) + return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l]) }