Merge pull request #203 from prometheus/absbytes

Generalize index and chunk readers
This commit is contained in:
Fabian Reinartz 2017-11-10 14:10:41 +00:00 committed by GitHub
commit 1d08a00d63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 235 additions and 159 deletions

View File

@ -142,8 +142,8 @@ type Block struct {
dir string dir string
meta BlockMeta meta BlockMeta
chunkr *chunkReader chunkr ChunkReader
indexr *indexReader indexr IndexReader
tombstones tombstoneReader tombstones tombstoneReader
} }
@ -156,11 +156,11 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) {
return nil, err return nil, err
} }
cr, err := newChunkReader(chunkDir(dir), pool) cr, err := NewDirChunkReader(chunkDir(dir), pool)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ir, err := newIndexReader(dir) ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -298,7 +298,7 @@ type ChunkReader interface {
// of series data. // of series data.
type chunkReader struct { type chunkReader struct {
// The underlying bytes holding the encoded series data. // The underlying bytes holding the encoded series data.
bs [][]byte bs []ByteSlice
// Closers for resources behind the byte slices. // Closers for resources behind the byte slices.
cs []io.Closer cs []io.Closer
@ -306,8 +306,32 @@ type chunkReader struct {
pool chunks.Pool pool chunks.Pool
} }
// newChunkReader returns a new chunkReader based on mmaped files found in dir. func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) {
func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { cr := chunkReader{pool: pool, bs: bs, cs: cs}
for i, b := range cr.bs {
if b.Len() < 4 {
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
return nil, fmt.Errorf("invalid magic number %x", m)
}
}
return &cr, nil
}
// NewChunkReader returns a new chunk reader against the given byte slices.
func NewChunkReader(bs []ByteSlice, pool chunks.Pool) (ChunkReader, error) {
if pool == nil {
pool = chunks.NewPool()
}
return newChunkReader(bs, nil, pool)
}
// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the
// given directory.
func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) {
files, err := sequenceFiles(dir) files, err := sequenceFiles(dir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -315,27 +339,19 @@ func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
if pool == nil { if pool == nil {
pool = chunks.NewPool() pool = chunks.NewPool()
} }
cr := chunkReader{pool: pool}
var bs []ByteSlice
var cs []io.Closer
for _, fn := range files { for _, fn := range files {
f, err := openMmapFile(fn) f, err := openMmapFile(fn)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "mmap files") return nil, errors.Wrapf(err, "mmap files")
} }
cr.cs = append(cr.cs, f) cs = append(cs, f)
cr.bs = append(cr.bs, f.b) bs = append(bs, realByteSlice(f.b))
} }
return newChunkReader(bs, cs, pool)
for i, b := range cr.bs {
if len(b) < 4 {
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks {
return nil, fmt.Errorf("invalid magic number %x", m)
}
}
return &cr, nil
} }
func (s *chunkReader) Close() error { func (s *chunkReader) Close() error {
@ -352,16 +368,18 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
} }
b := s.bs[seq] b := s.bs[seq]
if int(off) >= len(b) { if int(off) >= b.Len() {
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len())
} }
b = b[off:] // With the minimum chunk length this should never cause us reading
// over the end of the slice.
r := b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b) l, n := binary.Uvarint(r)
if n < 0 { if n < 0 {
return nil, fmt.Errorf("reading chunk length failed") return nil, fmt.Errorf("reading chunk length failed")
} }
b = b[n:] r = b.Range(off+n, off+n+int(l))
return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l]) return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l])
} }

View File

@ -3,6 +3,7 @@ package tsdb
import ( import (
"encoding/binary" "encoding/binary"
"hash" "hash"
"hash/crc32"
"unsafe" "unsafe"
) )
@ -77,6 +78,11 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// crc32 returns a CRC32 checksum over the remaining bytes.
func (d *decbuf) crc32() uint32 {
return crc32.Checksum(d.b, castagnoliTable)
}
func (d *decbuf) uvarintStr() string { func (d *decbuf) uvarintStr() string {
l := d.uvarint64() l := d.uvarint64()
if d.e != nil { if d.e != nil {

262
index.go
View File

@ -560,7 +560,7 @@ type StringTuples interface {
type indexReader struct { type indexReader struct {
// The underlying byte slice holding the encoded series data. // The underlying byte slice holding the encoded series data.
b []byte b ByteSlice
toc indexTOC toc indexTOC
// Close that releases the underlying resources of the byte slice. // Close that releases the underlying resources of the byte slice.
@ -585,27 +585,52 @@ var (
errInvalidChecksum = fmt.Errorf("invalid checksum") errInvalidChecksum = fmt.Errorf("invalid checksum")
) )
// NewIndexReader returns a new IndexReader on the given directory. // ByteSlice abstracts a byte slice.
func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) } type ByteSlice interface {
Len() int
Range(start, end int) []byte
}
// newIndexReader returns a new indexReader on the given directory. type realByteSlice []byte
func newIndexReader(dir string) (*indexReader, error) {
f, err := openMmapFile(filepath.Join(dir, "index")) func (b realByteSlice) Len() int {
return len(b)
}
func (b realByteSlice) Range(start, end int) []byte {
return b[start:end]
}
func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end]
}
// NewIndexReader returns a new IndexReader on the given byte slice.
func NewIndexReader(b ByteSlice) (IndexReader, error) {
return newIndexReader(b, nil)
}
// NewFileIndexReader returns a new index reader against the given index file.
func NewFileIndexReader(path string) (IndexReader, error) {
f, err := openMmapFile(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newIndexReader(realByteSlice(f.b), f)
}
func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) {
r := &indexReader{ r := &indexReader{
b: f.b, b: b,
c: f, c: c,
symbols: map[uint32]string{}, symbols: map[uint32]string{},
crc32: newCRC32(), crc32: newCRC32(),
} }
// Verify magic number. // Verify magic number.
if len(f.b) < 4 { if b.Len() < 4 {
return nil, errors.Wrap(errInvalidSize, "index header") return nil, errors.Wrap(errInvalidSize, "index header")
} }
if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
return nil, errors.Errorf("invalid magic number %x", m) return nil, errors.Errorf("invalid magic number %x", m)
} }
@ -615,6 +640,7 @@ func newIndexReader(dir string) (*indexReader, error) {
if err := r.readSymbols(int(r.toc.symbols)); err != nil { if err := r.readSymbols(int(r.toc.symbols)); err != nil {
return nil, errors.Wrap(err, "read symbols") return nil, errors.Wrap(err, "read symbols")
} }
var err error
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
if err != nil { if err != nil {
@ -625,7 +651,17 @@ func newIndexReader(dir string) (*indexReader, error) {
} }
func (r *indexReader) readTOC() error { func (r *indexReader) readTOC() error {
d := r.decbufAt(len(r.b) - indexTOCLen) if r.b.Len() < indexTOCLen {
return errInvalidSize
}
b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len())
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
d := decbuf{b: b[:len(b)-4]}
if d.crc32() != expCRC {
return errInvalidChecksum
}
r.toc.symbols = d.be64() r.toc.symbols = d.be64()
r.toc.series = d.be64() r.toc.series = d.be64()
@ -634,32 +670,61 @@ func (r *indexReader) readTOC() error {
r.toc.postings = d.be64() r.toc.postings = d.be64()
r.toc.postingsTable = d.be64() r.toc.postingsTable = d.be64()
if valid, err := r.checkCRC(d.be32(), len(r.b)-indexTOCLen, indexTOCLen-4); !valid {
return errors.Wrap(err, "TOC checksum")
}
return d.err() return d.err()
} }
// decbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum.
func (r *indexReader) decbufAt(off int) decbuf { func (r *indexReader) decbufAt(off int) decbuf {
if len(r.b) < off { if r.b.Len() < off+4 {
return decbuf{e: errInvalidSize} return decbuf{e: errInvalidSize}
} }
return decbuf{b: r.b[off:]} b := r.b.Range(off, off+4)
l := int(binary.BigEndian.Uint32(b))
if r.b.Len() < off+4+l+4 {
return decbuf{e: errInvalidSize}
} }
func (r *indexReader) checkCRC(crc uint32, off, cnt int) (bool, error) { // Load bytes holding the contents plus a CRC32 checksum.
r.crc32.Reset() b = r.b.Range(off+4, off+4+l+4)
if len(r.b) < off+cnt { dec := decbuf{b: b[:len(b)-4]}
return false, errInvalidSize
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
return decbuf{e: errInvalidChecksum}
} }
if _, err := r.crc32.Write(r.b[off : off+cnt]); err != nil { return dec
return false, errors.Wrap(err, "write to hash")
} }
if r.crc32.Sum32() != crc {
return false, errInvalidChecksum // decbufUvarintAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum.
func (r *indexReader) decbufUvarintAt(off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient.
if r.b.Len() < off+binary.MaxVarintLen32 {
return decbuf{e: errInvalidSize}
} }
return true, nil b := r.b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
if n > binary.MaxVarintLen32 {
return decbuf{e: errors.New("invalid uvarint")}
}
if r.b.Len() < off+n+int(l)+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+n, off+n+int(l)+4)
dec := decbuf{b: b[:len(b)-4]}
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
return decbuf{e: errInvalidChecksum}
}
return dec
} }
// readSymbols reads the symbol table fully into memory and allocates proper strings for them. // readSymbols reads the symbol table fully into memory and allocates proper strings for them.
@ -669,26 +734,22 @@ func (r *indexReader) readSymbols(off int) error {
if off == 0 { if off == 0 {
return nil return nil
} }
d := r.decbufAt(off)
var ( var (
d1 = r.decbufAt(int(off)) origLen = d.len()
l = d1.be32int() cnt = d.be32int()
d2 = d1.decbuf(l)
origLen = d2.len()
cnt = d2.be32int()
basePos = uint32(off) + 4 basePos = uint32(off) + 4
nextPos = basePos + uint32(origLen-d2.len()) nextPos = basePos + uint32(origLen-d.len())
) )
for d2.err() == nil && d2.len() > 0 && cnt > 0 { for d.err() == nil && d.len() > 0 && cnt > 0 {
s := d2.uvarintStr() s := d.uvarintStr()
r.symbols[uint32(nextPos)] = s r.symbols[uint32(nextPos)] = s
nextPos = basePos + uint32(origLen-d2.len()) nextPos = basePos + uint32(origLen-d.len())
cnt-- cnt--
} }
if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { return d.err()
return errors.Wrap(err, "symbol table checksum")
}
return d2.err()
} }
// readOffsetTable reads an offset table at the given position and returns a map // readOffsetTable reads an offset table at the given position and returns a map
@ -696,57 +757,29 @@ func (r *indexReader) readSymbols(off int) error {
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
const sep = "\xff" const sep = "\xff"
var ( d := r.decbufAt(int(off))
d1 = r.decbufAt(int(off)) cnt := d.be32()
l = d1.be32int()
d2 = d1.decbuf(l)
cnt = d2.be32()
)
res := make(map[string]uint32, 512) res := make(map[string]uint32, cnt)
for d2.err() == nil && d2.len() > 0 && cnt > 0 { for d.err() == nil && d.len() > 0 && cnt > 0 {
keyCount := int(d2.uvarint()) keyCount := int(d.uvarint())
keys := make([]string, 0, keyCount) keys := make([]string, 0, keyCount)
for i := 0; i < keyCount; i++ { for i := 0; i < keyCount; i++ {
keys = append(keys, d2.uvarintStr()) keys = append(keys, d.uvarintStr())
} }
res[strings.Join(keys, sep)] = uint32(d2.uvarint()) res[strings.Join(keys, sep)] = uint32(d.uvarint())
cnt-- cnt--
} }
return res, d.err()
if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return res, errors.Wrap(err, "offset table checksum")
}
return res, d2.err()
} }
func (r *indexReader) Close() error { func (r *indexReader) Close() error {
return r.c.Close() return r.c.Close()
} }
func (r *indexReader) section(o uint32) (byte, []byte, error) {
b := r.b[o:]
if len(b) < 5 {
return 0, nil, errors.Wrap(errInvalidSize, "read header")
}
flag := b[0]
l := binary.BigEndian.Uint32(b[1:5])
b = b[5:]
// b must have the given length plus 4 bytes for the CRC32 checksum.
if len(b) < int(l)+4 {
return 0, nil, errors.Wrap(errInvalidSize, "section content")
}
return flag, b[:l], nil
}
func (r *indexReader) lookupSymbol(o uint32) (string, error) { func (r *indexReader) lookupSymbol(o uint32) (string, error) {
s, ok := r.symbols[o] s, ok := r.symbols[o]
if !ok { if !ok {
@ -776,24 +809,17 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
//return nil, fmt.Errorf("label index doesn't exist") //return nil, fmt.Errorf("label index doesn't exist")
} }
d1 := r.decbufAt(int(off)) d := r.decbufAt(int(off))
l := d1.be32int()
d2 := d1.decbuf(l)
nc := d2.be32int() nc := d.be32int()
d2.be32() // consume unused value entry count. d.be32() // consume unused value entry count.
if d2.err() != nil { if d.err() != nil {
return nil, errors.Wrap(d2.err(), "read label value index") return nil, errors.Wrap(d.err(), "read label value index")
} }
if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return nil, errors.Wrap(err, "read label values checksum")
}
st := &serializedStringTuples{ st := &serializedStringTuples{
l: nc, l: nc,
b: d2.get(), b: d.get(),
lookup: r.lookupSymbol, lookup: r.lookupSymbol,
} }
return st, nil return st, nil
@ -816,22 +842,19 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
} }
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
d1 := r.decbufAt(int(ref)) d := r.decbufUvarintAt(int(ref))
l := d1.uvarint()
sl := len(r.b[ref:]) - d1.len() // # bytes in l
d2 := d1.decbuf(l)
*lbls = (*lbls)[:0] *lbls = (*lbls)[:0]
*chks = (*chks)[:0] *chks = (*chks)[:0]
k := int(d2.uvarint()) k := int(d.uvarint())
for i := 0; i < k; i++ { for i := 0; i < k; i++ {
lno := uint32(d2.uvarint()) lno := uint32(d.uvarint())
lvo := uint32(d2.uvarint()) lvo := uint32(d.uvarint())
if d2.err() != nil { if d.err() != nil {
return errors.Wrap(d2.err(), "read series label offsets") return errors.Wrap(d.err(), "read series label offsets")
} }
ln, err := r.lookupSymbol(lno) ln, err := r.lookupSymbol(lno)
@ -847,15 +870,15 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
} }
// Read the chunks meta data. // Read the chunks meta data.
k = int(d2.uvarint()) k = int(d.uvarint())
if k == 0 { if k == 0 {
return nil return nil
} }
t0 := d2.varint64() t0 := d.varint64()
maxt := int64(d2.uvarint64()) + t0 maxt := int64(d.uvarint64()) + t0
ref0 := int64(d2.uvarint64()) ref0 := int64(d.uvarint64())
*chks = append(*chks, ChunkMeta{ *chks = append(*chks, ChunkMeta{
Ref: uint64(ref0), Ref: uint64(ref0),
@ -865,14 +888,14 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
t0 = maxt t0 = maxt
for i := 1; i < k; i++ { for i := 1; i < k; i++ {
mint := int64(d2.uvarint64()) + t0 mint := int64(d.uvarint64()) + t0
maxt := int64(d2.uvarint64()) + mint maxt := int64(d.uvarint64()) + mint
ref0 += d2.varint64() ref0 += d.varint64()
t0 = maxt t0 = maxt
if d2.err() != nil { if d.err() != nil {
return errors.Wrapf(d2.err(), "read meta for chunk %d", i) return errors.Wrapf(d.err(), "read meta for chunk %d", i)
} }
*chks = append(*chks, ChunkMeta{ *chks = append(*chks, ChunkMeta{
@ -881,12 +904,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
MaxTime: maxt, MaxTime: maxt,
}) })
} }
return d.err()
if valid, err := r.checkCRC(d1.be32(), int(ref)+sl, l); !valid {
return errors.Wrap(err, "series checksum")
}
return nil
} }
func (r *indexReader) Postings(name, value string) (Postings, error) { func (r *indexReader) Postings(name, value string) (Postings, error) {
@ -897,22 +915,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
if !ok { if !ok {
return emptyPostings, nil return emptyPostings, nil
} }
d := r.decbufAt(int(off))
d.be32() // consume unused postings list length.
d1 := r.decbufAt(int(off)) return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes")
l := d1.be32int()
d2 := d1.decbuf(l)
d2.be32() // consume unused postings list length.
if d2.err() != nil {
return nil, errors.Wrap(d2.err(), "get postings bytes")
}
if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return nil, errors.Wrap(err, "postings checksum")
}
return newBigEndianPostings(d2.get()), nil
} }
func (r *indexReader) SortedPostings(p Postings) Postings { func (r *indexReader) SortedPostings(p Postings) Postings {

View File

@ -159,7 +159,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
require.NoError(t, err, "create index writer") require.NoError(t, err, "create index writer")
require.NoError(t, iw.Close(), "close index writer") require.NoError(t, iw.Close(), "close index writer")
ir, err := newIndexReader(dir) ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
require.NoError(t, err, "open index reader") require.NoError(t, err, "open index reader")
require.NoError(t, ir.Close(), "close index reader") require.NoError(t, ir.Close(), "close index reader")
@ -169,7 +169,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
_, err = f.WriteAt([]byte{0, 0}, 0) _, err = f.WriteAt([]byte{0, 0}, 0)
require.NoError(t, err) require.NoError(t, err)
_, err = newIndexReader(dir) _, err = NewFileIndexReader(dir)
require.Error(t, err) require.Error(t, err)
} }
@ -210,7 +210,7 @@ func TestIndexRW_Postings(t *testing.T) {
require.NoError(t, iw.Close()) require.NoError(t, iw.Close())
ir, err := newIndexReader(dir) ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
require.NoError(t, err, "open index reader") require.NoError(t, err, "open index reader")
p, err := ir.Postings("a", "1") p, err := ir.Postings("a", "1")
@ -325,7 +325,7 @@ func TestPersistence_index_e2e(t *testing.T) {
err = iw.Close() err = iw.Close()
require.NoError(t, err) require.NoError(t, err)
ir, err := newIndexReader(dir) ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
require.NoError(t, err) require.NoError(t, err)
for p := range mi.postings.m { for p := range mi.postings.m {

View File

@ -14,6 +14,9 @@
package test package test
import ( import (
"crypto/rand"
"fmt"
"hash/crc32"
"testing" "testing"
"github.com/cespare/xxhash" "github.com/cespare/xxhash"
@ -76,3 +79,46 @@ func fnv64a(b []byte) uint64 {
} }
return h return h
} }
func BenchmarkCRC32_diff(b *testing.B) {
data := [][]byte{}
for i := 0; i < 1000; i++ {
b := make([]byte, 512)
rand.Read(b)
data = append(data, b)
}
ctab := crc32.MakeTable(crc32.Castagnoli)
total := uint32(0)
b.Run("direct", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
total += crc32.Checksum(data[i%1000], ctab)
}
})
b.Run("hash-reuse", func(b *testing.B) {
b.ReportAllocs()
h := crc32.New(ctab)
for i := 0; i < b.N; i++ {
h.Reset()
h.Write(data[i%1000])
total += h.Sum32()
}
})
b.Run("hash-new", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
h := crc32.New(ctab)
h.Write(data[i%1000])
total += h.Sum32()
}
})
fmt.Println(total)
}