From 10943b6d88c19787bdf75d6193239a1f632f73ec Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 12 Dec 2016 08:12:19 +0100 Subject: [PATCH] Add initial index reader implementation --- reader.go | 143 +++++++++++++++++++++++++++++++++++++++++++++++++++--- writer.go | 15 +++--- 2 files changed, 143 insertions(+), 15 deletions(-) diff --git a/reader.go b/reader.go index a23efa5d5..9e9129665 100644 --- a/reader.go +++ b/reader.go @@ -3,14 +3,15 @@ package tsdb import ( "encoding/binary" "fmt" + "strings" "github.com/fabxc/tsdb/chunks" ) // SeriesReader provides reading access of serialized time series data. type SeriesReader interface { - // Chunk returns the series data chunk at the given offset. - Chunk(offset uint32) (chunks.Chunk, error) + // Chunk returns the series data chunk with the given reference. + Chunk(ref uint32) (chunks.Chunk, error) } // seriesReader implements a SeriesReader for a serialized byte stream @@ -47,16 +48,144 @@ func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) { // IndexReader provides reading access of serialized index data. type IndexReader interface { - // + // Stats returns statisitics about the indexed data. + Stats() (*BlockStats, error) - // Close releases resources associated with the reader. - Close() + // LabelValues returns the possible label values + LabelValues(names ...string) (StringTuples, error) + + // Postings returns the postings list iteartor for the label pair. + Postings(name, value string) (Iterator, error) + + // Series returns the series for the given reference. + Series(ref uint32) (Series, error) +} + +// StringTuple is a tuple of strings. +type StringTuple []string + +// StringTuples provides access to a sorted list of string tuples. +type StringTuples interface { + // Total number of tuples in the list. + Len() int + // At returns the tuple at position i. + At(i int) (StringTuple, error) } type indexReader struct { // The underlying byte slice holding the encoded series data. b []byte - // Cached hashmaps of sections for label values - labelOffsets map[string]uint32 + // Cached hashmaps of section offsets. + labels map[string]uint32 + postings map[string]uint32 +} + +var ( + errInvalidSize = fmt.Errorf("invalid size") + errInvalidFlag = fmt.Errorf("invalid flag") +) + +func newIndexReader(b []byte) (*indexReader, error) { + if len(b) < 16 { + return nil, errInvalidSize + } + r := &indexReader{b: b} + + // Verify magic number. + if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex { + return nil, fmt.Errorf("invalid magic number %x", m) + } + + var err error + // The last two 4 bytes hold the pointers to the hashmaps. + loff := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4]) + poff := binary.BigEndian.Uint32(b[len(b)-4:]) + + if r.labels, err = readHashmap(r.section(loff)); err != nil { + return nil, err + } + if r.postings, err = readHashmap(r.section(poff)); err != nil { + return nil, err + } + + return r, nil +} + +func readHashmap(flag byte, b []byte, err error) (map[string]uint32, error) { + if err != nil { + return nil, err + } + if flag != flagStd { + return nil, errInvalidFlag + } + h := make(map[string]uint32, 512) + + for len(b) > 0 { + l, n := binary.Uvarint(b) + if n < 1 { + return nil, errInvalidSize + } + s := string(b[n : n+int(l)]) + b = b[n+int(l):] + + o, n := binary.Uvarint(b) + if n < 1 { + return nil, errInvalidSize + } + b = b[n:] + + h[s] = uint32(o) + } + + return h, nil +} + +func (r *indexReader) section(o uint32) (byte, []byte, error) { + b := r.b[o:] + + if len(b) < 5 { + return 0, nil, errInvalidSize + } + + flag := r.b[0] + l := binary.BigEndian.Uint32(b[1:5]) + + b = b[5:] + + if len(b) < int(l) { + return 0, nil, errInvalidSize + } + return flag, b, nil +} + +func (r *indexReader) lookupSymbol(o uint32) ([]byte, error) { + l, n := binary.Uvarint(r.b[o:]) + if n < 0 { + return nil, fmt.Errorf("reading symbol length failed") + } + + end := int(o) + n + int(l) + if end > len(r.b) { + return nil, fmt.Errorf("invalid length") + } + + return r.b[int(o)+n : end], nil +} + +func (r *indexReader) Stats() (*BlockStats, error) { + return nil, nil +} + +func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { + key := strings.Join(names, string(sep)) + off, ok := r.labels[key] + if !ok { + return nil, fmt.Errorf("label index doesn't exist") + } + + flag, b, err := r.section(off) + if err != nil { + return nil, fmt.Errorf("section: %s", err) + } } diff --git a/writer.go b/writer.go index 1ca0078ec..13588e38d 100644 --- a/writer.go +++ b/writer.go @@ -217,9 +217,8 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er h := crc32.NewIEEE() wr := io.MultiWriter(h, w.w) - b := [5]byte{} - binary.BigEndian.PutUint32(b[:4], l) - b[4] = flagStd + b := [5]byte{flagStd, 0, 0, 0, 0} + binary.BigEndian.PutUint32(b[1:], l) if err := w.write(wr, b[:]); err != nil { return err @@ -379,15 +378,15 @@ type hashEntry struct { func (w *indexWriter) writeHashmap(h []hashEntry) error { b := make([]byte, 0, 4096) - buf := make([]byte, 4) + buf := [binary.MaxVarintLen32]byte{} for _, e := range h { - binary.PutUvarint(buf, uint64(len(e.name))) - b = append(b, buf...) + n := binary.PutUvarint(buf[:], uint64(len(e.name))) + b = append(b, buf[:n]...) b = append(b, e.name...) - binary.BigEndian.PutUint32(buf, e.offset) - b = append(b, buf...) + n = binary.PutUvarint(buf[:], uint64(e.offset)) + b = append(b, buf[:n]...) } return w.section(uint32(len(buf)), flagStd, func(wr io.Writer) error {