From c354d6bd590f5857b563ec23ce986fcaa7cabadb Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 9 Nov 2017 15:56:40 +0000 Subject: [PATCH 1/4] index: simplify checksum validation --- encoding_helpers.go | 6 +++ index.go | 89 ++++++++++++++++++--------------------------- test/hash_test.go | 46 +++++++++++++++++++++++ 3 files changed, 88 insertions(+), 53 deletions(-) diff --git a/encoding_helpers.go b/encoding_helpers.go index 9aa4ba409..b55c7fda9 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -3,6 +3,7 @@ package tsdb import ( "encoding/binary" "hash" + "hash/crc32" "unsafe" ) @@ -77,6 +78,11 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) } +// crc32 returns a CRC32 checksum over the remaining bytes. +func (d *decbuf) crc32() uint32 { + return crc32.Checksum(d.b, castagnoliTable) +} + func (d *decbuf) uvarintStr() string { l := d.uvarint64() if d.e != nil { diff --git a/index.go b/index.go index 36507f645..36783ced2 100644 --- a/index.go +++ b/index.go @@ -625,20 +625,24 @@ func newIndexReader(dir string) (*indexReader, error) { } func (r *indexReader) readTOC() error { - d := r.decbufAt(len(r.b) - indexTOCLen) + d1 := r.decbufAt(len(r.b) - indexTOCLen) + d2 := d1.decbuf(indexTOCLen - 4) + crc := d2.crc32() - r.toc.symbols = d.be64() - r.toc.series = d.be64() - r.toc.labelIndices = d.be64() - r.toc.labelIndicesTable = d.be64() - r.toc.postings = d.be64() - r.toc.postingsTable = d.be64() + r.toc.symbols = d2.be64() + r.toc.series = d2.be64() + r.toc.labelIndices = d2.be64() + r.toc.labelIndicesTable = d2.be64() + r.toc.postings = d2.be64() + r.toc.postingsTable = d2.be64() - if valid, err := r.checkCRC(d.be32(), len(r.b)-indexTOCLen, indexTOCLen-4); !valid { - return errors.Wrap(err, "TOC checksum") + if d2.err() != nil { + return d2.err() } - - return d.err() + if read := d1.be32(); crc != read { + return errors.Wrap(errInvalidChecksum, "read TOC") + } + return d1.err() } func (r *indexReader) decbufAt(off int) decbuf { @@ -648,20 +652,6 @@ func (r *indexReader) decbufAt(off int) decbuf { return decbuf{b: r.b[off:]} } -func (r *indexReader) checkCRC(crc uint32, off, cnt int) (bool, error) { - r.crc32.Reset() - if len(r.b) < off+cnt { - return false, errInvalidSize - } - if _, err := r.crc32.Write(r.b[off : off+cnt]); err != nil { - return false, errors.Wrap(err, "write to hash") - } - if r.crc32.Sum32() != crc { - return false, errInvalidChecksum - } - return true, nil -} - // readSymbols reads the symbol table fully into memory and allocates proper strings for them. // Strings backed by the mmap'd memory would cause memory faults if applications keep using them // after the reader is closed. @@ -671,8 +661,8 @@ func (r *indexReader) readSymbols(off int) error { } var ( d1 = r.decbufAt(int(off)) - l = d1.be32int() - d2 = d1.decbuf(l) + d2 = d1.decbuf(d1.be32int()) + crc = d2.crc32() origLen = d2.len() cnt = d2.be32int() basePos = uint32(off) + 4 @@ -685,8 +675,8 @@ func (r *indexReader) readSymbols(off int) error { nextPos = basePos + uint32(origLen-d2.len()) cnt-- } - if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { - return errors.Wrap(err, "symbol table checksum") + if read := d1.be32(); crc != read { + return errors.Wrap(errInvalidChecksum, "read symbols") } return d2.err() } @@ -698,8 +688,8 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { var ( d1 = r.decbufAt(int(off)) - l = d1.be32int() - d2 = d1.decbuf(l) + d2 = d1.decbuf(d1.be32int()) + crc = d2.crc32() cnt = d2.be32() ) @@ -716,11 +706,9 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { cnt-- } - - if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { - return res, errors.Wrap(err, "offset table checksum") + if read := d1.be32(); crc != read { + return nil, errors.Wrap(errInvalidChecksum, "read offset table") } - return res, d2.err() } @@ -777,8 +765,8 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { } d1 := r.decbufAt(int(off)) - l := d1.be32int() - d2 := d1.decbuf(l) + d2 := d1.decbuf(d1.be32int()) + crc := d2.crc32() nc := d2.be32int() d2.be32() // consume unused value entry count. @@ -787,10 +775,9 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { return nil, errors.Wrap(d2.err(), "read label value index") } - if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { - return nil, errors.Wrap(err, "read label values checksum") + if read := d1.be32(); crc != read { + return nil, errors.Wrap(errInvalidChecksum, "read label values") } - st := &serializedStringTuples{ l: nc, b: d2.get(), @@ -817,9 +804,8 @@ func (r *indexReader) LabelIndices() ([][]string, error) { func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { d1 := r.decbufAt(int(ref)) - l := d1.uvarint() - sl := len(r.b[ref:]) - d1.len() // # bytes in l - d2 := d1.decbuf(l) + d2 := d1.decbuf(d1.uvarint()) + crc := d2.crc32() *lbls = (*lbls)[:0] *chks = (*chks)[:0] @@ -881,11 +867,9 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) MaxTime: maxt, }) } - - if valid, err := r.checkCRC(d1.be32(), int(ref)+sl, l); !valid { - return errors.Wrap(err, "series checksum") + if read := d1.be32(); crc != read { + return errors.Wrap(errInvalidChecksum, "read series") } - return nil } @@ -899,19 +883,18 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { } d1 := r.decbufAt(int(off)) - l := d1.be32int() - d2 := d1.decbuf(l) + d2 := d1.decbuf(d1.be32int()) + + crc := d2.crc32() d2.be32() // consume unused postings list length. if d2.err() != nil { return nil, errors.Wrap(d2.err(), "get postings bytes") } - - if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid { - return nil, errors.Wrap(err, "postings checksum") + if read := d1.be32(); crc != read { + return nil, errors.Wrap(errInvalidChecksum, "read postings") } - return newBigEndianPostings(d2.get()), nil } diff --git a/test/hash_test.go b/test/hash_test.go index fa448b4c6..117616fd7 100644 --- a/test/hash_test.go +++ b/test/hash_test.go @@ -14,6 +14,9 @@ package test import ( + "crypto/rand" + "fmt" + "hash/crc32" "testing" "github.com/cespare/xxhash" @@ -76,3 +79,46 @@ func fnv64a(b []byte) uint64 { } return h } + +func BenchmarkCRC32_diff(b *testing.B) { + + data := [][]byte{} + + for i := 0; i < 1000; i++ { + b := make([]byte, 512) + rand.Read(b) + data = append(data, b) + } + + ctab := crc32.MakeTable(crc32.Castagnoli) + total := uint32(0) + + b.Run("direct", func(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + total += crc32.Checksum(data[i%1000], ctab) + } + }) + b.Run("hash-reuse", func(b *testing.B) { + b.ReportAllocs() + h := crc32.New(ctab) + + for i := 0; i < b.N; i++ { + h.Reset() + h.Write(data[i%1000]) + total += h.Sum32() + } + }) + b.Run("hash-new", func(b *testing.B) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + h := crc32.New(ctab) + h.Write(data[i%1000]) + total += h.Sum32() + } + }) + + fmt.Println(total) +} From b7c3cfecbf6355b10b1171945e37f6bce6697724 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 9 Nov 2017 17:27:09 +0000 Subject: [PATCH 2/4] index: abstract ByteSlice and adjust indexReader This replaces the builtin byte slice with an interface for the index reader. This allows the complex decoding of the index file format to be used against more generalized implementations. --- block.go | 6 +- index.go | 269 +++++++++++++++++++++++++++----------------------- index_test.go | 8 +- 3 files changed, 153 insertions(+), 130 deletions(-) diff --git a/block.go b/block.go index 2bd1fd364..2d7bab2d8 100644 --- a/block.go +++ b/block.go @@ -142,8 +142,8 @@ type Block struct { dir string meta BlockMeta - chunkr *chunkReader - indexr *indexReader + chunkr ChunkReader + indexr IndexReader tombstones tombstoneReader } @@ -160,7 +160,7 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { if err != nil { return nil, err } - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) if err != nil { return nil, err } diff --git a/index.go b/index.go index 36783ced2..277807e21 100644 --- a/index.go +++ b/index.go @@ -560,7 +560,7 @@ type StringTuples interface { type indexReader struct { // The underlying byte slice holding the encoded series data. - b []byte + b ByteSlice toc indexTOC // Close that releases the underlying resources of the byte slice. @@ -585,27 +585,52 @@ var ( errInvalidChecksum = fmt.Errorf("invalid checksum") ) -// NewIndexReader returns a new IndexReader on the given directory. -func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) } +// ByteSlice abstracts a byte slice. +type ByteSlice interface { + Len() int + Range(start, end int) []byte +} -// newIndexReader returns a new indexReader on the given directory. -func newIndexReader(dir string) (*indexReader, error) { - f, err := openMmapFile(filepath.Join(dir, "index")) +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) ByteSlice { + return b[start:end] +} + +// NewIndexReader returns a new IndexReader on the given directory. +func NewIndexReader(b ByteSlice) (IndexReader, error) { + return newIndexReader(b, nil) +} + +func NewFileIndexReader(path string) (IndexReader, error) { + f, err := openMmapFile(path) if err != nil { return nil, err } + return newIndexReader(realByteSlice(f.b), f) +} + +// newIndexReader returns a new indexReader on the given directory. +func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { r := &indexReader{ - b: f.b, - c: f, + b: b, + c: c, symbols: map[uint32]string{}, crc32: newCRC32(), } - // Verify magic number. - if len(f.b) < 4 { + if b.Len() < 4 { return nil, errors.Wrap(errInvalidSize, "index header") } - if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { + if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { return nil, errors.Errorf("invalid magic number %x", m) } @@ -615,6 +640,7 @@ func newIndexReader(dir string) (*indexReader, error) { if err := r.readSymbols(int(r.toc.symbols)); err != nil { return nil, errors.Wrap(err, "read symbols") } + var err error r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) if err != nil { @@ -625,31 +651,80 @@ func newIndexReader(dir string) (*indexReader, error) { } func (r *indexReader) readTOC() error { - d1 := r.decbufAt(len(r.b) - indexTOCLen) - d2 := d1.decbuf(indexTOCLen - 4) - crc := d2.crc32() - - r.toc.symbols = d2.be64() - r.toc.series = d2.be64() - r.toc.labelIndices = d2.be64() - r.toc.labelIndicesTable = d2.be64() - r.toc.postings = d2.be64() - r.toc.postingsTable = d2.be64() - - if d2.err() != nil { - return d2.err() + if r.b.Len() < indexTOCLen { + return errInvalidSize } - if read := d1.be32(); crc != read { - return errors.Wrap(errInvalidChecksum, "read TOC") + b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len()) + + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := decbuf{b: b[:len(b)-4]} + + if d.crc32() != expCRC { + return (errInvalidChecksum) } - return d1.err() + + r.toc.symbols = d.be64() + r.toc.series = d.be64() + r.toc.labelIndices = d.be64() + r.toc.labelIndicesTable = d.be64() + r.toc.postings = d.be64() + r.toc.postingsTable = d.be64() + + return d.err() } +// decbufAt returns a new decoding buffer. It expects the first 4 bytes +// after offset to hold the big endian encoded content length, followed by the contents and the expected +// checksum. func (r *indexReader) decbufAt(off int) decbuf { - if len(r.b) < off { + if r.b.Len() < off+4 { return decbuf{e: errInvalidSize} } - return decbuf{b: r.b[off:]} + b := r.b.Range(off, off+4) + l := int(binary.BigEndian.Uint32(b)) + + if r.b.Len() < off+4+l+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = r.b.Range(off+4, off+4+l+4) + dec := decbuf{b: b[:len(b)-4]} + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { + return decbuf{e: errInvalidChecksum} + } + return dec +} + +// decbufAt returns a new decoding buffer. It expects the first bytes +// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected +// checksum. +func (r *indexReader) decbufUvarintAt(off int) decbuf { + // We never have to access this method at the far end of the byte slice. Thus just checking + // against the MaxVarintLen32 is sufficient. + if r.b.Len() < off+binary.MaxVarintLen32 { + return decbuf{e: errInvalidSize} + } + b := r.b.Range(off, off+binary.MaxVarintLen32) + + l, n := binary.Uvarint(b) + if n > binary.MaxVarintLen32 { + return decbuf{e: errors.New("invalid uvarint")} + } + + if r.b.Len() < off+n+int(l)+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = r.b.Range(off+n, off+n+int(l)+4) + dec := decbuf{b: b[:len(b)-4]} + + if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { + return decbuf{e: errInvalidChecksum} + } + return dec } // readSymbols reads the symbol table fully into memory and allocates proper strings for them. @@ -659,26 +734,22 @@ func (r *indexReader) readSymbols(off int) error { if off == 0 { return nil } + d := r.decbufAt(off) + var ( - d1 = r.decbufAt(int(off)) - d2 = d1.decbuf(d1.be32int()) - crc = d2.crc32() - origLen = d2.len() - cnt = d2.be32int() + origLen = d.len() + cnt = d.be32int() basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d2.len()) + nextPos = basePos + uint32(origLen-d.len()) ) - for d2.err() == nil && d2.len() > 0 && cnt > 0 { - s := d2.uvarintStr() + for d.err() == nil && d.len() > 0 && cnt > 0 { + s := d.uvarintStr() r.symbols[uint32(nextPos)] = s - nextPos = basePos + uint32(origLen-d2.len()) + nextPos = basePos + uint32(origLen-d.len()) cnt-- } - if read := d1.be32(); crc != read { - return errors.Wrap(errInvalidChecksum, "read symbols") - } - return d2.err() + return d.err() } // readOffsetTable reads an offset table at the given position and returns a map @@ -686,55 +757,29 @@ func (r *indexReader) readSymbols(off int) error { func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { const sep = "\xff" - var ( - d1 = r.decbufAt(int(off)) - d2 = d1.decbuf(d1.be32int()) - crc = d2.crc32() - cnt = d2.be32() - ) + d := r.decbufAt(int(off)) + cnt := d.be32() - res := make(map[string]uint32, 512) + res := make(map[string]uint32, cnt) - for d2.err() == nil && d2.len() > 0 && cnt > 0 { - keyCount := int(d2.uvarint()) + for d.err() == nil && d.len() > 0 && cnt > 0 { + keyCount := int(d.uvarint()) keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.uvarintStr()) + keys = append(keys, d.uvarintStr()) } - res[strings.Join(keys, sep)] = uint32(d2.uvarint()) + res[strings.Join(keys, sep)] = uint32(d.uvarint()) cnt-- } - if read := d1.be32(); crc != read { - return nil, errors.Wrap(errInvalidChecksum, "read offset table") - } - return res, d2.err() + return res, d.err() } func (r *indexReader) Close() error { return r.c.Close() } -func (r *indexReader) section(o uint32) (byte, []byte, error) { - b := r.b[o:] - - if len(b) < 5 { - return 0, nil, errors.Wrap(errInvalidSize, "read header") - } - - flag := b[0] - l := binary.BigEndian.Uint32(b[1:5]) - - b = b[5:] - - // b must have the given length plus 4 bytes for the CRC32 checksum. - if len(b) < int(l)+4 { - return 0, nil, errors.Wrap(errInvalidSize, "section content") - } - return flag, b[:l], nil -} - func (r *indexReader) lookupSymbol(o uint32) (string, error) { s, ok := r.symbols[o] if !ok { @@ -764,23 +809,17 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - d1 := r.decbufAt(int(off)) - d2 := d1.decbuf(d1.be32int()) - crc := d2.crc32() + d := r.decbufAt(int(off)) - nc := d2.be32int() - d2.be32() // consume unused value entry count. + nc := d.be32int() + d.be32() // consume unused value entry count. - if d2.err() != nil { - return nil, errors.Wrap(d2.err(), "read label value index") - } - - if read := d1.be32(); crc != read { - return nil, errors.Wrap(errInvalidChecksum, "read label values") + if d.err() != nil { + return nil, errors.Wrap(d.err(), "read label value index") } st := &serializedStringTuples{ l: nc, - b: d2.get(), + b: d.get(), lookup: r.lookupSymbol, } return st, nil @@ -803,21 +842,19 @@ func (r *indexReader) LabelIndices() ([][]string, error) { } func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { - d1 := r.decbufAt(int(ref)) - d2 := d1.decbuf(d1.uvarint()) - crc := d2.crc32() + d := r.decbufUvarintAt(int(ref)) *lbls = (*lbls)[:0] *chks = (*chks)[:0] - k := int(d2.uvarint()) + k := int(d.uvarint()) for i := 0; i < k; i++ { - lno := uint32(d2.uvarint()) - lvo := uint32(d2.uvarint()) + lno := uint32(d.uvarint()) + lvo := uint32(d.uvarint()) - if d2.err() != nil { - return errors.Wrap(d2.err(), "read series label offsets") + if d.err() != nil { + return errors.Wrap(d.err(), "read series label offsets") } ln, err := r.lookupSymbol(lno) @@ -833,15 +870,15 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) } // Read the chunks meta data. - k = int(d2.uvarint()) + k = int(d.uvarint()) if k == 0 { return nil } - t0 := d2.varint64() - maxt := int64(d2.uvarint64()) + t0 - ref0 := int64(d2.uvarint64()) + t0 := d.varint64() + maxt := int64(d.uvarint64()) + t0 + ref0 := int64(d.uvarint64()) *chks = append(*chks, ChunkMeta{ Ref: uint64(ref0), @@ -851,14 +888,14 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) t0 = maxt for i := 1; i < k; i++ { - mint := int64(d2.uvarint64()) + t0 - maxt := int64(d2.uvarint64()) + mint + mint := int64(d.uvarint64()) + t0 + maxt := int64(d.uvarint64()) + mint - ref0 += d2.varint64() + ref0 += d.varint64() t0 = maxt - if d2.err() != nil { - return errors.Wrapf(d2.err(), "read meta for chunk %d", i) + if d.err() != nil { + return errors.Wrapf(d.err(), "read meta for chunk %d", i) } *chks = append(*chks, ChunkMeta{ @@ -867,10 +904,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) MaxTime: maxt, }) } - if read := d1.be32(); crc != read { - return errors.Wrap(errInvalidChecksum, "read series") - } - return nil + return d.err() } func (r *indexReader) Postings(name, value string) (Postings, error) { @@ -881,21 +915,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { if !ok { return emptyPostings, nil } + d := r.decbufAt(int(off)) + d.be32() // consume unused postings list length. - d1 := r.decbufAt(int(off)) - d2 := d1.decbuf(d1.be32int()) - - crc := d2.crc32() - - d2.be32() // consume unused postings list length. - - if d2.err() != nil { - return nil, errors.Wrap(d2.err(), "get postings bytes") - } - if read := d1.be32(); crc != read { - return nil, errors.Wrap(errInvalidChecksum, "read postings") - } - return newBigEndianPostings(d2.get()), nil + return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes") } func (r *indexReader) SortedPostings(p Postings) Postings { diff --git a/index_test.go b/index_test.go index 7b908e39a..f18aba0eb 100644 --- a/index_test.go +++ b/index_test.go @@ -159,7 +159,7 @@ func TestIndexRW_Create_Open(t *testing.T) { require.NoError(t, err, "create index writer") require.NoError(t, iw.Close(), "close index writer") - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err, "open index reader") require.NoError(t, ir.Close(), "close index reader") @@ -169,7 +169,7 @@ func TestIndexRW_Create_Open(t *testing.T) { _, err = f.WriteAt([]byte{0, 0}, 0) require.NoError(t, err) - _, err = newIndexReader(dir) + _, err = NewFileIndexReader(dir) require.Error(t, err) } @@ -210,7 +210,7 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, iw.Close()) - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err, "open index reader") p, err := ir.Postings("a", "1") @@ -325,7 +325,7 @@ func TestPersistence_index_e2e(t *testing.T) { err = iw.Close() require.NoError(t, err) - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err) for p := range mi.postings.m { From d578b10d5cbcfd481b3c9cb82e028aa4f649a470 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 10 Nov 2017 10:38:22 +0000 Subject: [PATCH 3/4] chunk: make reader accept abstract ByteSlice --- block.go | 2 +- chunks.go | 64 +++++++++++++++++++++++++++++++++++-------------------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/block.go b/block.go index 2d7bab2d8..034f33673 100644 --- a/block.go +++ b/block.go @@ -156,7 +156,7 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { return nil, err } - cr, err := newChunkReader(chunkDir(dir), pool) + cr, err := NewDirChunkReader(chunkDir(dir), pool) if err != nil { return nil, err } diff --git a/chunks.go b/chunks.go index f6e329b79..bab0a0bad 100644 --- a/chunks.go +++ b/chunks.go @@ -298,7 +298,7 @@ type ChunkReader interface { // of series data. type chunkReader struct { // The underlying bytes holding the encoded series data. - bs [][]byte + bs []ByteSlice // Closers for resources behind the byte slices. cs []io.Closer @@ -306,8 +306,32 @@ type chunkReader struct { pool chunks.Pool } -// newChunkReader returns a new chunkReader based on mmaped files found in dir. -func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { +func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) { + cr := chunkReader{pool: pool, bs: bs, cs: cs} + + for i, b := range cr.bs { + if b.Len() < 4 { + return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) + } + // Verify magic number. + if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { + return nil, fmt.Errorf("invalid magic number %x", m) + } + } + return &cr, nil +} + +// NewChunkReader returns a new chunk reader against the given byte slices. +func NewChunkReader(bs []ByteSlice, pool chunks.Pool) ChunkReader { + if pool == nil { + pool = chunks.NewPool() + } + return &chunkReader{bs: bs, pool: pool} +} + +// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the +// given directory. +func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) { files, err := sequenceFiles(dir) if err != nil { return nil, err @@ -315,27 +339,19 @@ func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) { if pool == nil { pool = chunks.NewPool() } - cr := chunkReader{pool: pool} + + var bs []ByteSlice + var cs []io.Closer for _, fn := range files { f, err := openMmapFile(fn) if err != nil { return nil, errors.Wrapf(err, "mmap files") } - cr.cs = append(cr.cs, f) - cr.bs = append(cr.bs, f.b) + cs = append(cs, f) + bs = append(bs, realByteSlice(f.b)) } - - for i, b := range cr.bs { - if len(b) < 4 { - return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks { - return nil, fmt.Errorf("invalid magic number %x", m) - } - } - return &cr, nil + return newChunkReader(bs, cs, pool) } func (s *chunkReader) Close() error { @@ -352,16 +368,18 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { } b := s.bs[seq] - if int(off) >= len(b) { - return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) + if int(off) >= b.Len() { + return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len()) } - b = b[off:] + // With the minimum chunk length this should never cause us reading + // over the end of the slice. + r := b.Range(off, off+binary.MaxVarintLen32) - l, n := binary.Uvarint(b) + l, n := binary.Uvarint(r) if n < 0 { return nil, fmt.Errorf("reading chunk length failed") } - b = b[n:] + r = b.Range(off+n, off+n+int(l)) - return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l]) + return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l]) } From ac5bd71d8fa90c2045873e49505bae7d4bf83ec3 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 10 Nov 2017 13:23:14 +0000 Subject: [PATCH 4/4] Doc fixes --- chunks.go | 4 ++-- index.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/chunks.go b/chunks.go index bab0a0bad..30ab93f80 100644 --- a/chunks.go +++ b/chunks.go @@ -322,11 +322,11 @@ func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkRea } // NewChunkReader returns a new chunk reader against the given byte slices. -func NewChunkReader(bs []ByteSlice, pool chunks.Pool) ChunkReader { +func NewChunkReader(bs []ByteSlice, pool chunks.Pool) (ChunkReader, error) { if pool == nil { pool = chunks.NewPool() } - return &chunkReader{bs: bs, pool: pool} + return newChunkReader(bs, nil, pool) } // NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the diff --git a/index.go b/index.go index 277807e21..6895c16f4 100644 --- a/index.go +++ b/index.go @@ -605,11 +605,12 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { return b[start:end] } -// NewIndexReader returns a new IndexReader on the given directory. +// NewIndexReader returns a new IndexReader on the given byte slice. func NewIndexReader(b ByteSlice) (IndexReader, error) { return newIndexReader(b, nil) } +// NewFileIndexReader returns a new index reader against the given index file. func NewFileIndexReader(path string) (IndexReader, error) { f, err := openMmapFile(path) if err != nil { @@ -618,7 +619,6 @@ func NewFileIndexReader(path string) (IndexReader, error) { return newIndexReader(realByteSlice(f.b), f) } -// newIndexReader returns a new indexReader on the given directory. func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { r := &indexReader{ b: b, @@ -660,7 +660,7 @@ func (r *indexReader) readTOC() error { d := decbuf{b: b[:len(b)-4]} if d.crc32() != expCRC { - return (errInvalidChecksum) + return errInvalidChecksum } r.toc.symbols = d.be64() @@ -697,7 +697,7 @@ func (r *indexReader) decbufAt(off int) decbuf { return dec } -// decbufAt returns a new decoding buffer. It expects the first bytes +// decbufUvarintAt returns a new decoding buffer. It expects the first bytes // after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected // checksum. func (r *indexReader) decbufUvarintAt(off int) decbuf {