diff --git a/block.go b/block.go index 2bd1fd364..034f33673 100644 --- a/block.go +++ b/block.go @@ -142,8 +142,8 @@ type Block struct { dir string meta BlockMeta - chunkr *chunkReader - indexr *indexReader + chunkr ChunkReader + indexr IndexReader tombstones tombstoneReader } @@ -156,11 +156,11 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { return nil, err } - cr, err := newChunkReader(chunkDir(dir), pool) + cr, err := NewDirChunkReader(chunkDir(dir), pool) if err != nil { return nil, err } - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) if err != nil { return nil, err } diff --git a/chunks.go b/chunks.go index f6e329b79..30ab93f80 100644 --- a/chunks.go +++ b/chunks.go @@ -298,7 +298,7 @@ type ChunkReader interface { // of series data. type chunkReader struct { // The underlying bytes holding the encoded series data. - bs [][]byte + bs []ByteSlice // Closers for resources behind the byte slices. cs []io.Closer @@ -306,8 +306,32 @@ type chunkReader struct { pool chunks.Pool } -// newChunkReader returns a new chunkReader based on mmaped files found in dir. -func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { +func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) { + cr := chunkReader{pool: pool, bs: bs, cs: cs} + + for i, b := range cr.bs { + if b.Len() < 4 { + return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) + } + // Verify magic number. + if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { + return nil, fmt.Errorf("invalid magic number %x", m) + } + } + return &cr, nil +} + +// NewChunkReader returns a new chunk reader against the given byte slices. +func NewChunkReader(bs []ByteSlice, pool chunks.Pool) (ChunkReader, error) { + if pool == nil { + pool = chunks.NewPool() + } + return newChunkReader(bs, nil, pool) +} + +// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the +// given directory. +func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) { files, err := sequenceFiles(dir) if err != nil { return nil, err @@ -315,27 +339,19 @@ func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { if pool == nil { pool = chunks.NewPool() } - cr := chunkReader{pool: pool} + + var bs []ByteSlice + var cs []io.Closer for _, fn := range files { f, err := openMmapFile(fn) if err != nil { return nil, errors.Wrapf(err, "mmap files") } - cr.cs = append(cr.cs, f) - cr.bs = append(cr.bs, f.b) + cs = append(cs, f) + bs = append(bs, realByteSlice(f.b)) } - - for i, b := range cr.bs { - if len(b) < 4 { - return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks { - return nil, fmt.Errorf("invalid magic number %x", m) - } - } - return &cr, nil + return newChunkReader(bs, cs, pool) } func (s *chunkReader) Close() error { @@ -352,16 +368,18 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { } b := s.bs[seq] - if int(off) >= len(b) { - return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) + if int(off) >= b.Len() { + return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len()) } - b = b[off:] + // With the minimum chunk length this should never cause us reading + // over the end of the slice. + r := b.Range(off, off+binary.MaxVarintLen32) - l, n := binary.Uvarint(b) + l, n := binary.Uvarint(r) if n < 0 { return nil, fmt.Errorf("reading chunk length failed") } - b = b[n:] + r = b.Range(off+n, off+n+int(l)) - return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l]) + return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l]) } diff --git a/encoding_helpers.go b/encoding_helpers.go index 9aa4ba409..b55c7fda9 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -3,6 +3,7 @@ package tsdb import ( "encoding/binary" "hash" + "hash/crc32" "unsafe" ) @@ -77,6 +78,11 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) } +// crc32 returns a CRC32 checksum over the remaining bytes. +func (d *decbuf) crc32() uint32 { + return crc32.Checksum(d.b, castagnoliTable) +} + func (d *decbuf) uvarintStr() string { l := d.uvarint64() if d.e != nil { diff --git a/index.go b/index.go index 36507f645..6895c16f4 100644 --- a/index.go +++ b/index.go @@ -560,7 +560,7 @@ type StringTuples interface { type indexReader struct { // The underlying byte slice holding the encoded series data. - b []byte + b ByteSlice toc indexTOC // Close that releases the underlying resources of the byte slice. @@ -585,27 +585,52 @@ var ( errInvalidChecksum = fmt.Errorf("invalid checksum") ) -// NewIndexReader returns a new IndexReader on the given directory. -func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) } +// ByteSlice abstracts a byte slice. +type ByteSlice interface { + Len() int + Range(start, end int) []byte +} -// newIndexReader returns a new indexReader on the given directory. -func newIndexReader(dir string) (*indexReader, error) { - f, err := openMmapFile(filepath.Join(dir, "index")) +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) ByteSlice { + return b[start:end] +} + +// NewIndexReader returns a new IndexReader on the given byte slice. +func NewIndexReader(b ByteSlice) (IndexReader, error) { + return newIndexReader(b, nil) +} + +// NewFileIndexReader returns a new index reader against the given index file. +func NewFileIndexReader(path string) (IndexReader, error) { + f, err := openMmapFile(path) if err != nil { return nil, err } + return newIndexReader(realByteSlice(f.b), f) +} + +func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { r := &indexReader{ - b: f.b, - c: f, + b: b, + c: c, symbols: map[uint32]string{}, crc32: newCRC32(), } - // Verify magic number. - if len(f.b) < 4 { + if b.Len() < 4 { return nil, errors.Wrap(errInvalidSize, "index header") } - if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { + if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { return nil, errors.Errorf("invalid magic number %x", m) } @@ -615,6 +640,7 @@ func newIndexReader(dir string) (*indexReader, error) { if err := r.readSymbols(int(r.toc.symbols)); err != nil { return nil, errors.Wrap(err, "read symbols") } + var err error r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) if err != nil { @@ -625,7 +651,17 @@ func newIndexReader(dir string) (*indexReader, error) { } func (r *indexReader) readTOC() error { - d := r.decbufAt(len(r.b) - indexTOCLen) + if r.b.Len() < indexTOCLen { + return errInvalidSize + } + b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len()) + + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := decbuf{b: b[:len(b)-4]} + + if d.crc32() != expCRC { + return errInvalidChecksum + } r.toc.symbols = d.be64() r.toc.series = d.be64() @@ -634,32 +670,61 @@ func (r *indexReader) readTOC() error { r.toc.postings = d.be64() r.toc.postingsTable = d.be64() - if valid, err := r.checkCRC(d.be32(), len(r.b)-indexTOCLen, indexTOCLen-4); !valid { - return errors.Wrap(err, "TOC checksum") - } - return d.err() } +// decbufAt returns a new decoding buffer. It expects the first 4 bytes +// after offset to hold the big endian encoded content length, followed by the contents and the expected +// checksum. func (r *indexReader) decbufAt(off int) decbuf { - if len(r.b) < off { + if r.b.Len() < off+4 { return decbuf{e: errInvalidSize} } - return decbuf{b: r.b[off:]} + b := r.b.Range(off, off+4) + l := int(binary.BigEndian.Uint32(b)) + + if r.b.Len() < off+4+l+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = r.b.Range(off+4, off+4+l+4) + dec := decbuf{b: b[:len(b)-4]} + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { + return decbuf{e: errInvalidChecksum} + } + return dec } -func (r *indexReader) checkCRC(crc uint32, off, cnt int) (bool, error) { - r.crc32.Reset() - if len(r.b) < off+cnt { - return false, errInvalidSize +// decbufUvarintAt returns a new decoding buffer. It expects the first bytes +// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected +// checksum. +func (r *indexReader) decbufUvarintAt(off int) decbuf { + // We never have to access this method at the far end of the byte slice. Thus just checking + // against the MaxVarintLen32 is sufficient. + if r.b.Len() < off+binary.MaxVarintLen32 { + return decbuf{e: errInvalidSize} } - if _, err := r.crc32.Write(r.b[off : off+cnt]); err != nil { - return false, errors.Wrap(err, "write to hash") + b := r.b.Range(off, off+binary.MaxVarintLen32) + + l, n := binary.Uvarint(b) + if n > binary.MaxVarintLen32 { + return decbuf{e: errors.New("invalid uvarint")} } - if r.crc32.Sum32() != crc { - return false, errInvalidChecksum + + if r.b.Len() < off+n+int(l)+4 { + return decbuf{e: errInvalidSize} } - return true, nil + + // Load bytes holding the contents plus a CRC32 checksum. + b = r.b.Range(off+n, off+n+int(l)+4) + dec := decbuf{b: b[:len(b)-4]} + + if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { + return decbuf{e: errInvalidChecksum} + } + return dec } // readSymbols reads the symbol table fully into memory and allocates proper strings for them. @@ -669,26 +734,22 @@ func (r *indexReader) readSymbols(off int) error { if off == 0 { return nil } + d := r.decbufAt(off) + var ( - d1 = r.decbufAt(int(off)) - l = d1.be32int() - d2 = d1.decbuf(l) - origLen = d2.len() - cnt = d2.be32int() + origLen = d.len() + cnt = d.be32int() basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d2.len()) + nextPos = basePos + uint32(origLen-d.len()) ) - for d2.err() == nil && d2.len() > 0 && cnt > 0 { - s := d2.uvarintStr() + for d.err() == nil && d.len() > 0 && cnt > 0 { + s := d.uvarintStr() r.symbols[uint32(nextPos)] = s - nextPos = basePos + uint32(origLen-d2.len()) + nextPos = basePos + uint32(origLen-d.len()) cnt-- } - if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { - return errors.Wrap(err, "symbol table checksum") - } - return d2.err() + return d.err() } // readOffsetTable reads an offset table at the given position and returns a map @@ -696,57 +757,29 @@ func (r *indexReader) readSymbols(off int) error { func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { const sep = "\xff" - var ( - d1 = r.decbufAt(int(off)) - l = d1.be32int() - d2 = d1.decbuf(l) - cnt = d2.be32() - ) + d := r.decbufAt(int(off)) + cnt := d.be32() - res := make(map[string]uint32, 512) + res := make(map[string]uint32, cnt) - for d2.err() == nil && d2.len() > 0 && cnt > 0 { - keyCount := int(d2.uvarint()) + for d.err() == nil && d.len() > 0 && cnt > 0 { + keyCount := int(d.uvarint()) keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.uvarintStr()) + keys = append(keys, d.uvarintStr()) } - res[strings.Join(keys, sep)] = uint32(d2.uvarint()) + res[strings.Join(keys, sep)] = uint32(d.uvarint()) cnt-- } - - if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { - return res, errors.Wrap(err, "offset table checksum") - } - - return res, d2.err() + return res, d.err() } func (r *indexReader) Close() error { return r.c.Close() } -func (r *indexReader) section(o uint32) (byte, []byte, error) { - b := r.b[o:] - - if len(b) < 5 { - return 0, nil, errors.Wrap(errInvalidSize, "read header") - } - - flag := b[0] - l := binary.BigEndian.Uint32(b[1:5]) - - b = b[5:] - - // b must have the given length plus 4 bytes for the CRC32 checksum. - if len(b) < int(l)+4 { - return 0, nil, errors.Wrap(errInvalidSize, "section content") - } - return flag, b[:l], nil -} - func (r *indexReader) lookupSymbol(o uint32) (string, error) { s, ok := r.symbols[o] if !ok { @@ -776,24 +809,17 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - d1 := r.decbufAt(int(off)) - l := d1.be32int() - d2 := d1.decbuf(l) + d := r.decbufAt(int(off)) - nc := d2.be32int() - d2.be32() // consume unused value entry count. + nc := d.be32int() + d.be32() // consume unused value entry count. - if d2.err() != nil { - return nil, errors.Wrap(d2.err(), "read label value index") + if d.err() != nil { + return nil, errors.Wrap(d.err(), "read label value index") } - - if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { - return nil, errors.Wrap(err, "read label values checksum") - } - st := &serializedStringTuples{ l: nc, - b: d2.get(), + b: d.get(), lookup: r.lookupSymbol, } return st, nil @@ -816,22 +842,19 @@ func (r *indexReader) LabelIndices() ([][]string, error) { } func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { - d1 := r.decbufAt(int(ref)) - l := d1.uvarint() - sl := len(r.b[ref:]) - d1.len() // # bytes in l - d2 := d1.decbuf(l) + d := r.decbufUvarintAt(int(ref)) *lbls = (*lbls)[:0] *chks = (*chks)[:0] - k := int(d2.uvarint()) + k := int(d.uvarint()) for i := 0; i < k; i++ { - lno := uint32(d2.uvarint()) - lvo := uint32(d2.uvarint()) + lno := uint32(d.uvarint()) + lvo := uint32(d.uvarint()) - if d2.err() != nil { - return errors.Wrap(d2.err(), "read series label offsets") + if d.err() != nil { + return errors.Wrap(d.err(), "read series label offsets") } ln, err := r.lookupSymbol(lno) @@ -847,15 +870,15 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) } // Read the chunks meta data. - k = int(d2.uvarint()) + k = int(d.uvarint()) if k == 0 { return nil } - t0 := d2.varint64() - maxt := int64(d2.uvarint64()) + t0 - ref0 := int64(d2.uvarint64()) + t0 := d.varint64() + maxt := int64(d.uvarint64()) + t0 + ref0 := int64(d.uvarint64()) *chks = append(*chks, ChunkMeta{ Ref: uint64(ref0), @@ -865,14 +888,14 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) t0 = maxt for i := 1; i < k; i++ { - mint := int64(d2.uvarint64()) + t0 - maxt := int64(d2.uvarint64()) + mint + mint := int64(d.uvarint64()) + t0 + maxt := int64(d.uvarint64()) + mint - ref0 += d2.varint64() + ref0 += d.varint64() t0 = maxt - if d2.err() != nil { - return errors.Wrapf(d2.err(), "read meta for chunk %d", i) + if d.err() != nil { + return errors.Wrapf(d.err(), "read meta for chunk %d", i) } *chks = append(*chks, ChunkMeta{ @@ -881,12 +904,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) MaxTime: maxt, }) } - - if valid, err := r.checkCRC(d1.be32(), int(ref)+sl, l); !valid { - return errors.Wrap(err, "series checksum") - } - - return nil + return d.err() } func (r *indexReader) Postings(name, value string) (Postings, error) { @@ -897,22 +915,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { if !ok { return emptyPostings, nil } + d := r.decbufAt(int(off)) + d.be32() // consume unused postings list length. - d1 := r.decbufAt(int(off)) - l := d1.be32int() - d2 := d1.decbuf(l) - - d2.be32() // consume unused postings list length. - - if d2.err() != nil { - return nil, errors.Wrap(d2.err(), "get postings bytes") - } - - if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { - return nil, errors.Wrap(err, "postings checksum") - } - - return newBigEndianPostings(d2.get()), nil + return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes") } func (r *indexReader) SortedPostings(p Postings) Postings { diff --git a/index_test.go b/index_test.go index 7b908e39a..f18aba0eb 100644 --- a/index_test.go +++ b/index_test.go @@ -159,7 +159,7 @@ func TestIndexRW_Create_Open(t *testing.T) { require.NoError(t, err, "create index writer") require.NoError(t, iw.Close(), "close index writer") - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err, "open index reader") require.NoError(t, ir.Close(), "close index reader") @@ -169,7 +169,7 @@ func TestIndexRW_Create_Open(t *testing.T) { _, err = f.WriteAt([]byte{0, 0}, 0) require.NoError(t, err) - _, err = newIndexReader(dir) + _, err = NewFileIndexReader(dir) require.Error(t, err) } @@ -210,7 +210,7 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, iw.Close()) - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err, "open index reader") p, err := ir.Postings("a", "1") @@ -325,7 +325,7 @@ func TestPersistence_index_e2e(t *testing.T) { err = iw.Close() require.NoError(t, err) - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err) for p := range mi.postings.m { diff --git a/test/hash_test.go b/test/hash_test.go index fa448b4c6..117616fd7 100644 --- a/test/hash_test.go +++ b/test/hash_test.go @@ -14,6 +14,9 @@ package test import ( + "crypto/rand" + "fmt" + "hash/crc32" "testing" "github.com/cespare/xxhash" @@ -76,3 +79,46 @@ func fnv64a(b []byte) uint64 { } return h } + +func BenchmarkCRC32_diff(b *testing.B) { + + data := [][]byte{} + + for i := 0; i < 1000; i++ { + b := make([]byte, 512) + rand.Read(b) + data = append(data, b) + } + + ctab := crc32.MakeTable(crc32.Castagnoli) + total := uint32(0) + + b.Run("direct", func(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + total += crc32.Checksum(data[i%1000], ctab) + } + }) + b.Run("hash-reuse", func(b *testing.B) { + b.ReportAllocs() + h := crc32.New(ctab) + + for i := 0; i < b.N; i++ { + h.Reset() + h.Write(data[i%1000]) + total += h.Sum32() + } + }) + b.Run("hash-new", func(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + h := crc32.New(ctab) + h.Write(data[i%1000]) + total += h.Sum32() + } + }) + + fmt.Println(total) +}