chunk: make reader accept abstract ByteSlice
This commit is contained in:
parent
b7c3cfecbf
commit
d578b10d5c
2
block.go
2
block.go
|
@ -156,7 +156,7 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cr, err := newChunkReader(chunkDir(dir), pool)
|
cr, err := NewDirChunkReader(chunkDir(dir), pool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
64
chunks.go
64
chunks.go
|
@ -298,7 +298,7 @@ type ChunkReader interface {
|
||||||
// of series data.
|
// of series data.
|
||||||
type chunkReader struct {
|
type chunkReader struct {
|
||||||
// The underlying bytes holding the encoded series data.
|
// The underlying bytes holding the encoded series data.
|
||||||
bs [][]byte
|
bs []ByteSlice
|
||||||
|
|
||||||
// Closers for resources behind the byte slices.
|
// Closers for resources behind the byte slices.
|
||||||
cs []io.Closer
|
cs []io.Closer
|
||||||
|
@ -306,8 +306,32 @@ type chunkReader struct {
|
||||||
pool chunks.Pool
|
pool chunks.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
|
func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) {
|
||||||
func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
|
cr := chunkReader{pool: pool, bs: bs, cs: cs}
|
||||||
|
|
||||||
|
for i, b := range cr.bs {
|
||||||
|
if b.Len() < 4 {
|
||||||
|
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
|
||||||
|
}
|
||||||
|
// Verify magic number.
|
||||||
|
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
|
||||||
|
return nil, fmt.Errorf("invalid magic number %x", m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &cr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewChunkReader returns a new chunk reader against the given byte slices.
|
||||||
|
func NewChunkReader(bs []ByteSlice, pool chunks.Pool) ChunkReader {
|
||||||
|
if pool == nil {
|
||||||
|
pool = chunks.NewPool()
|
||||||
|
}
|
||||||
|
return &chunkReader{bs: bs, pool: pool}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the
|
||||||
|
// given directory.
|
||||||
|
func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) {
|
||||||
files, err := sequenceFiles(dir)
|
files, err := sequenceFiles(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -315,27 +339,19 @@ func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
|
||||||
if pool == nil {
|
if pool == nil {
|
||||||
pool = chunks.NewPool()
|
pool = chunks.NewPool()
|
||||||
}
|
}
|
||||||
cr := chunkReader{pool: pool}
|
|
||||||
|
var bs []ByteSlice
|
||||||
|
var cs []io.Closer
|
||||||
|
|
||||||
for _, fn := range files {
|
for _, fn := range files {
|
||||||
f, err := openMmapFile(fn)
|
f, err := openMmapFile(fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "mmap files")
|
return nil, errors.Wrapf(err, "mmap files")
|
||||||
}
|
}
|
||||||
cr.cs = append(cr.cs, f)
|
cs = append(cs, f)
|
||||||
cr.bs = append(cr.bs, f.b)
|
bs = append(bs, realByteSlice(f.b))
|
||||||
}
|
}
|
||||||
|
return newChunkReader(bs, cs, pool)
|
||||||
for i, b := range cr.bs {
|
|
||||||
if len(b) < 4 {
|
|
||||||
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
|
|
||||||
}
|
|
||||||
// Verify magic number.
|
|
||||||
if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks {
|
|
||||||
return nil, fmt.Errorf("invalid magic number %x", m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &cr, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *chunkReader) Close() error {
|
func (s *chunkReader) Close() error {
|
||||||
|
@ -352,16 +368,18 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||||
}
|
}
|
||||||
b := s.bs[seq]
|
b := s.bs[seq]
|
||||||
|
|
||||||
if int(off) >= len(b) {
|
if int(off) >= b.Len() {
|
||||||
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b))
|
return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len())
|
||||||
}
|
}
|
||||||
b = b[off:]
|
// With the minimum chunk length this should never cause us reading
|
||||||
|
// over the end of the slice.
|
||||||
|
r := b.Range(off, off+binary.MaxVarintLen32)
|
||||||
|
|
||||||
l, n := binary.Uvarint(b)
|
l, n := binary.Uvarint(r)
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
return nil, fmt.Errorf("reading chunk length failed")
|
return nil, fmt.Errorf("reading chunk length failed")
|
||||||
}
|
}
|
||||||
b = b[n:]
|
r = b.Range(off+n, off+n+int(l))
|
||||||
|
|
||||||
return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l])
|
return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l])
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue