diff --git a/vendor/github.com/fabxc/tsdb/compact.go b/vendor/github.com/fabxc/tsdb/compact.go index 51d4ce0d1..573e9b440 100644 --- a/vendor/github.com/fabxc/tsdb/compact.go +++ b/vendor/github.com/fabxc/tsdb/compact.go @@ -163,6 +163,8 @@ func (c *compactor) Compact(dirs ...string) (err error) { if err != nil { return err } + defer b.Close() + blocks = append(blocks, b) } diff --git a/vendor/github.com/fabxc/tsdb/db.go b/vendor/github.com/fabxc/tsdb/db.go index b1a3a2421..bf8600fc4 100644 --- a/vendor/github.com/fabxc/tsdb/db.go +++ b/vendor/github.com/fabxc/tsdb/db.go @@ -6,10 +6,8 @@ import ( "fmt" "io" "io/ioutil" - "math" "os" "path/filepath" - "reflect" "strconv" "strings" "sync" @@ -334,6 +332,9 @@ func (db *DB) reloadBlocks() error { db.mtx.Lock() defer db.mtx.Unlock() + db.headmtx.Lock() + defer db.headmtx.Unlock() + dirs, err := blockDirs(db.dir) if err != nil { return errors.Wrap(err, "find blocks") @@ -355,17 +356,20 @@ func (db *DB) reloadBlocks() error { for i, meta := range metas { b, ok := db.seqBlocks[meta.Sequence] - if !ok { - return errors.Errorf("missing block for sequence %d", meta.Sequence) - } if meta.Compaction.Generation == 0 { + if !ok { + b, err = openHeadBlock(dirs[i], db.logger) + if err != nil { + return errors.Wrapf(err, "load head at %s", dirs[i]) + } + } if meta.ULID != b.Meta().ULID { return errors.Errorf("head block ULID changed unexpectedly") } heads = append(heads, b.(*headBlock)) } else { - if meta.ULID != b.Meta().ULID { + if ok && meta.ULID != b.Meta().ULID { if err := b.Close(); err != nil { return err } @@ -404,15 +408,18 @@ func (db *DB) Close() error { // the block to be used afterwards. db.mtx.Lock() - var merr MultiError + var g errgroup.Group for _, pb := range db.persisted { - merr.Add(pb.Close()) + g.Go(pb.Close) } for _, hb := range db.heads { - merr.Add(hb.Close()) + g.Go(hb.Close) } + var merr MultiError + + merr.Add(g.Wait()) merr.Add(db.lockf.Unlock()) return merr.Err() @@ -453,19 +460,6 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) return ref | (uint64(h.generation) << 40), nil } -func (a *dbAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) { - h, err := a.appenderFor(t) - if err != nil { - return 0, err - } - ref, err := h.hashedAdd(hash, lset, t, v) - if err != nil { - return 0, err - } - a.samples++ - return ref | (uint64(h.generation) << 40), nil -} - func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { // We store the head generation in the 4th byte and use it to reject // stale references. @@ -523,10 +517,9 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { return nil, ErrNotFound } +// ensureHead makes sure that there is a head block for the timestamp t if +// it is within or after the currently appendable window. func (db *DB) ensureHead(t int64) error { - // db.mtx.Lock() - // defer db.mtx.Unlock() - // Initial case for a new database: we must create the first // AppendableBlocks-1 front padding heads. if len(db.heads) == 0 { @@ -717,123 +710,6 @@ func nextSequenceFile(dir, prefix string) (string, int, error) { return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil } -// PartitionedDB is a time series storage. -type PartitionedDB struct { - logger log.Logger - dir string - - partitionPow uint - Partitions []*DB -} - -func isPowTwo(x int) bool { - return x > 0 && (x&(x-1)) == 0 -} - -// OpenPartitioned or create a new DB. -func OpenPartitioned(dir string, n int, l log.Logger, r prometheus.Registerer, opts *Options) (*PartitionedDB, error) { - if !isPowTwo(n) { - return nil, errors.Errorf("%d is not a power of two", n) - } - if opts == nil { - opts = DefaultOptions - } - if l == nil { - l = log.NewLogfmtLogger(os.Stdout) - l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) - } - - if err := os.MkdirAll(dir, 0777); err != nil { - return nil, err - } - c := &PartitionedDB{ - logger: l, - dir: dir, - partitionPow: uint(math.Log2(float64(n))), - } - - // Initialize vertical partitiondb. - // TODO(fabxc): validate partition number to be power of 2, which is required - // for the bitshift-modulo when finding the right partition. - for i := 0; i < n; i++ { - l := log.NewContext(l).With("partition", i) - d := partitionDir(dir, i) - - s, err := Open(d, l, r, opts) - if err != nil { - return nil, fmt.Errorf("initializing partition %q failed: %s", d, err) - } - - c.Partitions = append(c.Partitions, s) - } - - return c, nil -} - -func partitionDir(base string, i int) string { - return filepath.Join(base, fmt.Sprintf("p-%0.4d", i)) -} - -// Close the database. -func (db *PartitionedDB) Close() error { - var g errgroup.Group - - for _, partition := range db.Partitions { - g.Go(partition.Close) - } - - return g.Wait() -} - -// Appender returns a new appender against the database. -func (db *PartitionedDB) Appender() Appender { - app := &partitionedAppender{db: db} - - for _, p := range db.Partitions { - app.partitions = append(app.partitions, p.Appender().(*dbAppender)) - } - return app -} - -type partitionedAppender struct { - db *PartitionedDB - partitions []*dbAppender -} - -func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - h := lset.Hash() - p := h >> (64 - a.db.partitionPow) - - ref, err := a.partitions[p].hashedAdd(h, lset, t, v) - if err != nil { - return 0, err - } - return ref | (p << 48), nil -} - -func (a *partitionedAppender) AddFast(ref uint64, t int64, v float64) error { - p := uint8((ref << 8) >> 56) - return a.partitions[p].AddFast(ref, t, v) -} - -func (a *partitionedAppender) Commit() error { - var merr MultiError - - for _, p := range a.partitions { - merr.Add(p.Commit()) - } - return merr.Err() -} - -func (a *partitionedAppender) Rollback() error { - var merr MultiError - - for _, p := range a.partitions { - merr.Add(p.Rollback()) - } - return merr.Err() -} - // The MultiError type implements the error interface, and contains the // Errors used to construct it. type MultiError []error @@ -877,13 +753,7 @@ func (es MultiError) Err() error { } func yoloString(b []byte) string { - sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - - h := reflect.StringHeader{ - Data: sh.Data, - Len: sh.Len, - } - return *((*string)(unsafe.Pointer(&h))) + return *((*string)(unsafe.Pointer(&b))) } func closeAll(cs ...io.Closer) error { diff --git a/vendor/github.com/fabxc/tsdb/db_amd64.go b/vendor/github.com/fabxc/tsdb/db_amd64.go deleted file mode 100644 index cfd85c975..000000000 --- a/vendor/github.com/fabxc/tsdb/db_amd64.go +++ /dev/null @@ -1,10 +0,0 @@ -package tsdb - -// maxMapSize represents the largest mmap size supported by Bolt. -const maxMapSize = 0xFFFFFFFFFFFF // 256TB - -// maxAllocSize is the size used when creating array pointers. -const maxAllocSize = 0x7FFFFFFF - -// Are unaligned load/stores broken on this arch? -var brokenUnaligned = false diff --git a/vendor/github.com/fabxc/tsdb/head.go b/vendor/github.com/fabxc/tsdb/head.go index 058c8fe79..211cadf77 100644 --- a/vendor/github.com/fabxc/tsdb/head.go +++ b/vendor/github.com/fabxc/tsdb/head.go @@ -44,7 +44,6 @@ type headBlock struct { activeWriters uint64 - symbols map[string]struct{} // descs holds all chunk descs for the head block. Each chunk implicitly // is assigned the index as its ID. series []*memSeries @@ -150,7 +149,7 @@ func (h *headBlock) Close() error { h.mtx.Lock() if err := h.wal.Close(); err != nil { - return err + return errors.Wrapf(err, "close WAL for head %s", h.dir) } // Check whether the head block still exists in the underlying dir // or has already been replaced with a compacted version or removed. @@ -223,10 +222,8 @@ type refdSample struct { } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - return a.hashedAdd(lset.Hash(), lset, t, v) -} + hash := lset.Hash() -func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) { if ms := a.get(hash, lset); ms != nil { return uint64(ms.ref), a.AddFast(uint64(ms.ref), t, v) } @@ -530,13 +527,6 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { return s } -func (h *headBlock) fullness() float64 { - h.metamtx.RLock() - defer h.metamtx.RUnlock() - - return float64(h.meta.Stats.NumSamples) / float64(h.meta.Stats.NumSeries+1) / 250 -} - func (h *headBlock) updateMapping() { h.mtx.RLock() @@ -586,7 +576,7 @@ type memSeries struct { lastValue float64 sampleBuf [4]sample - app chunks.Appender // Current appender for the chunkdb. + app chunks.Appender // Current appender for the chunk. } func (s *memSeries) cut() *memChunk { diff --git a/vendor/github.com/fabxc/tsdb/querier.go b/vendor/github.com/fabxc/tsdb/querier.go index 7783ef312..a397b9a63 100644 --- a/vendor/github.com/fabxc/tsdb/querier.go +++ b/vendor/github.com/fabxc/tsdb/querier.go @@ -76,6 +76,9 @@ func (s *DB) Querier(mint, maxt int64) Querier { } func (q *querier) LabelValues(n string) ([]string, error) { + if len(q.blocks) == 0 { + return nil, nil + } res, err := q.blocks[0].LabelValues(n) if err != nil { return nil, err @@ -163,12 +166,16 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { } return &blockSeriesSet{ - index: q.index, - chunks: q.chunks, - it: p, - absent: absent, - mint: q.mint, - maxt: q.maxt, + set: &populatedChunkSeries{ + set: &baseChunkSeries{ + p: p, + index: q.index, + absent: absent, + }, + chunks: q.chunks, + mint: q.mint, + maxt: q.maxt, + }, } } @@ -233,69 +240,6 @@ func (q *blockQuerier) Close() error { return nil } -// partitionedQuerier merges query results from a set of partition querieres. -type partitionedQuerier struct { - mint, maxt int64 - partitions []Querier -} - -// Querier returns a new querier over the database for the given -// time range. -func (db *PartitionedDB) Querier(mint, maxt int64) Querier { - q := &partitionedQuerier{ - mint: mint, - maxt: maxt, - } - for _, s := range db.Partitions { - q.partitions = append(q.partitions, s.Querier(mint, maxt)) - } - - return q -} - -func (q *partitionedQuerier) Select(ms ...labels.Matcher) SeriesSet { - // We gather the non-overlapping series from every partition and simply - // return their union. - r := &mergedSeriesSet{} - - for _, s := range q.partitions { - r.sets = append(r.sets, s.Select(ms...)) - } - if len(r.sets) == 0 { - return nopSeriesSet{} - } - return r -} - -func (q *partitionedQuerier) LabelValues(n string) ([]string, error) { - res, err := q.partitions[0].LabelValues(n) - if err != nil { - return nil, err - } - for _, sq := range q.partitions[1:] { - pr, err := sq.LabelValues(n) - if err != nil { - return nil, err - } - // Merge new values into deduplicated result. - res = mergeStrings(res, pr) - } - return res, nil -} - -func (q *partitionedQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { - return nil, fmt.Errorf("not implemented") -} - -func (q *partitionedQuerier) Close() error { - var merr MultiError - - for _, sq := range q.partitions { - merr.Add(sq.Close()) - } - return merr.Err() -} - func mergeStrings(a, b []string) []string { maxl := len(a) if len(b) > len(a) { @@ -424,23 +368,31 @@ func (s *partitionSeriesSet) Next() bool { return true } -// blockSeriesSet is a set of series from an inverted index query. -type blockSeriesSet struct { - index IndexReader - chunks ChunkReader - it Postings // postings list referencing series - absent []string // labels that must not be set for result series - mint, maxt int64 // considered time range - - err error - cur Series +type chunkSeriesSet interface { + Next() bool + At() (labels.Labels, []ChunkMeta) + Err() error } -func (s *blockSeriesSet) Next() bool { - // Step through the postings iterator to find potential series. -outer: - for s.it.Next() { - lset, chunks, err := s.index.Series(s.it.At()) +// baseChunkSeries loads the label set and chunk references for a postings +// list from an index. It filters out series that have labels set that should be unset. +type baseChunkSeries struct { + p Postings + index IndexReader + absent []string // labels that must be unset in results. + + lset labels.Labels + chks []ChunkMeta + err error +} + +func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } +func (s *baseChunkSeries) Err() error { return s.err } + +func (s *baseChunkSeries) Next() bool { +Outer: + for s.p.Next() { + lset, chunks, err := s.index.Series(s.p.At()) if err != nil { s.err = err return false @@ -449,35 +401,87 @@ outer: // If a series contains a label that must be absent, it is skipped as well. for _, abs := range s.absent { if lset.Get(abs) != "" { - continue outer + continue Outer } } - ser := &chunkSeries{ - labels: lset, - chunks: make([]ChunkMeta, 0, len(chunks)), - chunk: s.chunks.Chunk, - } - // Only use chunks that fit the time range. - for _, c := range chunks { + s.lset = lset + s.chks = chunks + + return true + } + if err := s.p.Err(); err != nil { + s.err = err + } + return false +} + +// populatedChunkSeries loads chunk data from a store for a set of series +// with known chunk references. It filters out chunks that do not fit the +// given time range. +type populatedChunkSeries struct { + set chunkSeriesSet + chunks ChunkReader + mint, maxt int64 + + err error + chks []ChunkMeta + lset labels.Labels +} + +func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } +func (s *populatedChunkSeries) Err() error { return s.err } + +func (s *populatedChunkSeries) Next() bool { + for s.set.Next() { + lset, chks := s.set.At() + + for i := range chks { + c := &chks[i] + if c.MaxTime < s.mint { + chks = chks[1:] continue } if c.MinTime > s.maxt { + chks = chks[:i] break } - ser.chunks = append(ser.chunks, c) + c.Chunk, s.err = s.chunks.Chunk(c.Ref) + if s.err != nil { + return false + } } - // If no chunks of the series apply to the time range, skip it. - if len(ser.chunks) == 0 { + if len(chks) == 0 { continue } - s.cur = ser + s.lset = lset + s.chks = chks + return true } - if s.it.Err() != nil { - s.err = s.it.Err() + if err := s.set.Err(); err != nil { + s.err = err + } + return false +} + +// blockSeriesSet is a set of series from an inverted index query. +type blockSeriesSet struct { + set chunkSeriesSet + err error + cur Series +} + +func (s *blockSeriesSet) Next() bool { + for s.set.Next() { + lset, chunks := s.set.At() + s.cur = &chunkSeries{labels: lset, chunks: chunks} + return true + } + if s.set.Err() != nil { + s.err = s.set.Err() } return false } @@ -490,10 +494,6 @@ func (s *blockSeriesSet) Err() error { return s.err } type chunkSeries struct { labels labels.Labels chunks []ChunkMeta // in-order chunk refs - - // chunk is a function that retrieves chunks based on a reference - // number contained in the chunk meta information. - chunk func(ref uint64) (chunks.Chunk, error) } func (s *chunkSeries) Labels() labels.Labels { @@ -501,21 +501,7 @@ func (s *chunkSeries) Labels() labels.Labels { } func (s *chunkSeries) Iterator() SeriesIterator { - var cs []chunks.Chunk - var mints []int64 - - for _, co := range s.chunks { - c, err := s.chunk(co.Ref) - if err != nil { - panic(err) // TODO(fabxc): add error series iterator. - } - cs = append(cs, c) - mints = append(mints, co.MinTime) - } - - // TODO(fabxc): consider pushing chunk retrieval further down. In practice, we - // probably have to touch all chunks anyway and it doesn't matter. - return newChunkSeriesIterator(mints, cs) + return newChunkSeriesIterator(s.chunks) } // SeriesIterator iterates over the data of a time series. @@ -601,43 +587,38 @@ func (it *chainedSeriesIterator) Err() error { // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { - mints []int64 // minimum timestamps for each iterator - chunks []chunks.Chunk + chunks []ChunkMeta i int cur chunks.Iterator } -func newChunkSeriesIterator(mints []int64, cs []chunks.Chunk) *chunkSeriesIterator { - if len(mints) != len(cs) { - panic("chunk references and chunks length don't match") - } +func newChunkSeriesIterator(cs []ChunkMeta) *chunkSeriesIterator { return &chunkSeriesIterator{ - mints: mints, chunks: cs, i: 0, - cur: cs[0].Iterator(), + cur: cs[0].Chunk.Iterator(), } } func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { // Only do binary search forward to stay in line with other iterators // that can only move forward. - x := sort.Search(len(it.mints[it.i:]), func(i int) bool { return it.mints[i] >= t }) + x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t }) x += it.i // If the timestamp was not found, it might be in the last chunk. - if x == len(it.mints) { + if x == len(it.chunks) { x-- } // Go to previous chunk if the chunk doesn't exactly start with t. // If we are already at the first chunk, we use it as it's the best we have. - if x > 0 && it.mints[x] > t { + if x > 0 && it.chunks[x].MinTime > t { x-- } it.i = x - it.cur = it.chunks[x].Iterator() + it.cur = it.chunks[x].Chunk.Iterator() for it.cur.Next() { t0, _ := it.cur.At() @@ -664,7 +645,7 @@ func (it *chunkSeriesIterator) Next() bool { } it.i++ - it.cur = it.chunks[it.i].Iterator() + it.cur = it.chunks[it.i].Chunk.Iterator() return it.Next() } diff --git a/vendor/github.com/fabxc/tsdb/reader.go b/vendor/github.com/fabxc/tsdb/reader.go deleted file mode 100644 index d4d816c1f..000000000 --- a/vendor/github.com/fabxc/tsdb/reader.go +++ /dev/null @@ -1,459 +0,0 @@ -package tsdb - -import ( - "encoding/binary" - "fmt" - "io" - "path/filepath" - "strings" - - "github.com/fabxc/tsdb/chunks" - "github.com/fabxc/tsdb/labels" - "github.com/pkg/errors" -) - -// ChunkReader provides reading access of serialized time series data. -type ChunkReader interface { - // Chunk returns the series data chunk with the given reference. - Chunk(ref uint64) (chunks.Chunk, error) - - // Close releases all underlying resources of the reader. - Close() error -} - -// chunkReader implements a SeriesReader for a serialized byte stream -// of series data. -type chunkReader struct { - // The underlying bytes holding the encoded series data. - bs [][]byte - - // Closers for resources behind the byte slices. - cs []io.Closer -} - -// newChunkReader returns a new chunkReader based on mmaped files found in dir. -func newChunkReader(dir string) (*chunkReader, error) { - files, err := sequenceFiles(dir, "") - if err != nil { - return nil, err - } - var cr chunkReader - - for _, fn := range files { - f, err := openMmapFile(fn) - if err != nil { - return nil, errors.Wrapf(err, "mmap files") - } - cr.cs = append(cr.cs, f) - cr.bs = append(cr.bs, f.b) - } - - for i, b := range cr.bs { - if len(b) < 4 { - return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries { - return nil, fmt.Errorf("invalid magic number %x", m) - } - } - return &cr, nil -} - -func (s *chunkReader) Close() error { - return closeAll(s.cs...) -} - -func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { - var ( - seq = int(ref >> 32) - off = int((ref << 32) >> 32) - ) - if seq >= len(s.bs) { - return nil, errors.Errorf("reference sequence %d out of range", seq) - } - b := s.bs[seq] - - if int(off) >= len(b) { - return nil, errors.Errorf("offset %d beyond data size %d", off, len(b)) - } - b = b[off:] - - l, n := binary.Uvarint(b) - if n < 0 { - return nil, fmt.Errorf("reading chunk length failed") - } - b = b[n:] - enc := chunks.Encoding(b[0]) - - c, err := chunks.FromData(enc, b[1:1+l]) - if err != nil { - return nil, err - } - return c, nil -} - -// IndexReader provides reading access of serialized index data. -type IndexReader interface { - // LabelValues returns the possible label values - LabelValues(names ...string) (StringTuples, error) - - // Postings returns the postings list iterator for the label pair. - Postings(name, value string) (Postings, error) - - // Series returns the series for the given reference. - Series(ref uint32) (labels.Labels, []ChunkMeta, error) - - // LabelIndices returns the label pairs for which indices exist. - LabelIndices() ([][]string, error) - - // Close released the underlying resources of the reader. - Close() error -} - -// StringTuples provides access to a sorted list of string tuples. -type StringTuples interface { - // Total number of tuples in the list. - Len() int - // At returns the tuple at position i. - At(i int) ([]string, error) -} - -type indexReader struct { - // The underlying byte slice holding the encoded series data. - b []byte - - // Close that releases the underlying resources of the byte slice. - c io.Closer - - // Cached hashmaps of section offsets. - labels map[string]uint32 - postings map[string]uint32 -} - -var ( - errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") -) - -// newIndexReader returns a new indexReader on the given directory. -func newIndexReader(dir string) (*indexReader, error) { - f, err := openMmapFile(filepath.Join(dir, "index")) - if err != nil { - return nil, err - } - r := &indexReader{b: f.b, c: f} - - // Verify magic number. - if len(f.b) < 4 { - return nil, errors.Wrap(errInvalidSize, "index header") - } - if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { - return nil, errors.Errorf("invalid magic number %x", m) - } - - // The last two 4 bytes hold the pointers to the hashmaps. - loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) - poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) - - flag, b, err := r.section(loff) - if err != nil { - return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) - } - if r.labels, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read label index hashmap") - } - flag, b, err = r.section(poff) - if err != nil { - return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) - } - if r.postings, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read postings hashmap") - } - - return r, nil -} - -func readHashmap(flag byte, b []byte) (map[string]uint32, error) { - if flag != flagStd { - return nil, errInvalidFlag - } - h := make(map[string]uint32, 512) - - for len(b) > 0 { - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read key length") - } - b = b[n:] - - if len(b) < int(l) { - return nil, errors.Wrap(errInvalidSize, "read key") - } - s := string(b[:l]) - b = b[l:] - - o, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read offset value") - } - b = b[n:] - - h[s] = uint32(o) - } - - return h, nil -} - -func (r *indexReader) Close() error { - return r.c.Close() -} - -func (r *indexReader) section(o uint32) (byte, []byte, error) { - b := r.b[o:] - - if len(b) < 5 { - return 0, nil, errors.Wrap(errInvalidSize, "read header") - } - - flag := b[0] - l := binary.BigEndian.Uint32(b[1:5]) - - b = b[5:] - - // b must have the given length plus 4 bytes for the CRC32 checksum. - if len(b) < int(l)+4 { - return 0, nil, errors.Wrap(errInvalidSize, "section content") - } - return flag, b[:l], nil -} - -func (r *indexReader) lookupSymbol(o uint32) (string, error) { - if int(o) > len(r.b) { - return "", errors.Errorf("invalid symbol offset %d", o) - } - l, n := binary.Uvarint(r.b[o:]) - if n < 0 { - return "", errors.New("reading symbol length failed") - } - - end := int(o) + n + int(l) - if end > len(r.b) { - return "", errors.New("invalid length") - } - b := r.b[int(o)+n : end] - - return yoloString(b), nil -} - -func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { - key := strings.Join(names, string(sep)) - off, ok := r.labels[key] - if !ok { - return nil, fmt.Errorf("label index doesn't exist") - } - - flag, b, err := r.section(off) - if err != nil { - return nil, errors.Wrapf(err, "section at %d", off) - } - if flag != flagStd { - return nil, errInvalidFlag - } - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read label index size") - } - - st := &serializedStringTuples{ - l: int(l), - b: b[n:], - lookup: r.lookupSymbol, - } - return st, nil -} - -func (r *indexReader) LabelIndices() ([][]string, error) { - res := [][]string{} - - for s := range r.labels { - res = append(res, strings.Split(s, string(sep))) - } - return res, nil -} - -func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { - k, n := binary.Uvarint(r.b[ref:]) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of labels") - } - - b := r.b[int(ref)+n:] - lbls := make(labels.Labels, 0, k) - - for i := 0; i < 2*int(k); i += 2 { - o, m := binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") - } - n, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") - } - b = b[m:] - - o, m = binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") - } - v, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") - } - b = b[m:] - - lbls = append(lbls, labels.Label{ - Name: n, - Value: v, - }) - } - - // Read the chunks meta data. - k, n = binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of chunks") - } - - b = b[n:] - chunks := make([]ChunkMeta, 0, k) - - for i := 0; i < int(k); i++ { - firstTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "first time") - } - b = b[n:] - - lastTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "last time") - } - b = b[n:] - - o, n := binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "chunk offset") - } - b = b[n:] - - chunks = append(chunks, ChunkMeta{ - Ref: o, - MinTime: firstTime, - MaxTime: lastTime, - }) - } - - return lbls, chunks, nil -} - -func (r *indexReader) Postings(name, value string) (Postings, error) { - key := name + string(sep) + value - - off, ok := r.postings[key] - if !ok { - return nil, ErrNotFound - } - - flag, b, err := r.section(off) - if err != nil { - return nil, errors.Wrapf(err, "section at %d", off) - } - - if flag != flagStd { - return nil, errors.Wrapf(errInvalidFlag, "section at %d", off) - } - - // TODO(fabxc): just read into memory as an intermediate solution. - // Add iterator over serialized data. - var l []uint32 - - for len(b) > 0 { - if len(b) < 4 { - return nil, errors.Wrap(errInvalidSize, "plain postings entry") - } - l = append(l, binary.BigEndian.Uint32(b[:4])) - - b = b[4:] - } - - return &listPostings{list: l, idx: -1}, nil -} - -type stringTuples struct { - l int // tuple length - s []string // flattened tuple entries -} - -func newStringTuples(s []string, l int) (*stringTuples, error) { - if len(s)%l != 0 { - return nil, errors.Wrap(errInvalidSize, "string tuple list") - } - return &stringTuples{s: s, l: l}, nil -} - -func (t *stringTuples) Len() int { return len(t.s) / t.l } -func (t *stringTuples) At(i int) ([]string, error) { return t.s[i : i+t.l], nil } - -func (t *stringTuples) Swap(i, j int) { - c := make([]string, t.l) - copy(c, t.s[i:i+t.l]) - - for k := 0; k < t.l; k++ { - t.s[i+k] = t.s[j+k] - t.s[j+k] = c[k] - } -} - -func (t *stringTuples) Less(i, j int) bool { - for k := 0; k < t.l; k++ { - d := strings.Compare(t.s[i+k], t.s[j+k]) - - if d < 0 { - return true - } - if d > 0 { - return false - } - } - return false -} - -type serializedStringTuples struct { - l int - b []byte - lookup func(uint32) (string, error) -} - -func (t *serializedStringTuples) Len() int { - // TODO(fabxc): Cache this? - return len(t.b) / (4 * t.l) -} - -func (t *serializedStringTuples) At(i int) ([]string, error) { - if len(t.b) < (i+t.l)*4 { - return nil, errInvalidSize - } - res := make([]string, 0, t.l) - - for k := 0; k < t.l; k++ { - offset := binary.BigEndian.Uint32(t.b[(i+k)*4:]) - - s, err := t.lookup(offset) - if err != nil { - return nil, errors.Wrap(err, "symbol lookup") - } - res = append(res, s) - } - - return res, nil -} diff --git a/vendor/github.com/fabxc/tsdb/wal.go b/vendor/github.com/fabxc/tsdb/wal.go index 8b88d110d..5b85ca3c9 100644 --- a/vendor/github.com/fabxc/tsdb/wal.go +++ b/vendor/github.com/fabxc/tsdb/wal.go @@ -448,7 +448,11 @@ func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { cr := r.rs[r.cur] et, flag, b, err := r.entry(cr) - if err == io.EOF { + // If we reached the end of the reader, advance to the next one + // and close. + // Do not close on the last one as it will still be appended to. + // XXX(fabxc): leaky abstraction. + if err == io.EOF && r.cur < len(r.rs)-1 { // Current reader completed, close and move to the next one. if err := cr.Close(); err != nil { return 0, 0, nil, err diff --git a/vendor/github.com/fabxc/tsdb/writer.go b/vendor/github.com/fabxc/tsdb/writer.go deleted file mode 100644 index 264052d32..000000000 --- a/vendor/github.com/fabxc/tsdb/writer.go +++ /dev/null @@ -1,611 +0,0 @@ -package tsdb - -import ( - "bufio" - "encoding/binary" - "hash" - "hash/crc32" - "io" - "os" - "path/filepath" - "sort" - "strings" - - "github.com/coreos/etcd/pkg/fileutil" - "github.com/fabxc/tsdb/chunks" - "github.com/fabxc/tsdb/labels" - "github.com/pkg/errors" -) - -const ( - // MagicSeries 4 bytes at the head of series file. - MagicSeries = 0x85BD40DD - - // MagicIndex 4 bytes at the head of an index file. - MagicIndex = 0xBAAAD700 -) - -const compactionPageBytes = minSectorSize * 64 - -// ChunkWriter serializes a time block of chunked series data. -type ChunkWriter interface { - // WriteChunks writes several chunks. The data field of the ChunkMetas - // must be populated. - // After returning successfully, the Ref fields in the ChunkMetas - // is set and can be used to retrieve the chunks from the written data. - WriteChunks(chunks ...ChunkMeta) error - - // Close writes any required finalization and closes the resources - // associated with the underlying writer. - Close() error -} - -// chunkWriter implements the ChunkWriter interface for the standard -// serialization format. -type chunkWriter struct { - dirFile *os.File - files []*os.File - wbuf *bufio.Writer - n int64 - crc32 hash.Hash - - segmentSize int64 -} - -const ( - defaultChunkSegmentSize = 512 * 1024 * 1024 - - chunksFormatV1 = 1 - indexFormatV1 = 1 -) - -func newChunkWriter(dir string) (*chunkWriter, error) { - if err := os.MkdirAll(dir, 0777); err != nil { - return nil, err - } - dirFile, err := fileutil.OpenDir(dir) - if err != nil { - return nil, err - } - cw := &chunkWriter{ - dirFile: dirFile, - n: 0, - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), - segmentSize: defaultChunkSegmentSize, - } - return cw, nil -} - -func (w *chunkWriter) tail() *os.File { - if len(w.files) == 0 { - return nil - } - return w.files[len(w.files)-1] -} - -// finalizeTail writes all pending data to the current tail file, -// truncates its size, and closes it. -func (w *chunkWriter) finalizeTail() error { - tf := w.tail() - if tf == nil { - return nil - } - - if err := w.wbuf.Flush(); err != nil { - return err - } - if err := fileutil.Fsync(tf); err != nil { - return err - } - // As the file was pre-allocated, we truncate any superfluous zero bytes. - off, err := tf.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } - if err := tf.Truncate(off); err != nil { - return err - } - return tf.Close() -} - -func (w *chunkWriter) cut() error { - // Sync current tail to disk and close. - w.finalizeTail() - - p, _, err := nextSequenceFile(w.dirFile.Name(), "") - if err != nil { - return err - } - f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - return err - } - if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { - return err - } - if err = w.dirFile.Sync(); err != nil { - return err - } - - // Write header metadata for new file. - - metab := make([]byte, 8) - binary.BigEndian.PutUint32(metab[:4], MagicSeries) - metab[4] = chunksFormatV1 - - if _, err := f.Write(metab); err != nil { - return err - } - - w.files = append(w.files, f) - if w.wbuf != nil { - w.wbuf.Reset(f) - } else { - w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) - } - w.n = 8 - - return nil -} - -func (w *chunkWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) - w.n += int64(n) - return err -} - -func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { - // Calculate maximum space we need and cut a new segment in case - // we don't fit into the current one. - maxLen := int64(binary.MaxVarintLen32) - for _, c := range chks { - maxLen += binary.MaxVarintLen32 + 1 - maxLen += int64(len(c.Chunk.Bytes())) - } - newsz := w.n + maxLen - - if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize { - if err := w.cut(); err != nil { - return err - } - } - - // Write chunks sequentially and set the reference field in the ChunkMeta. - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.wbuf) - - b := make([]byte, binary.MaxVarintLen32) - n := binary.PutUvarint(b, uint64(len(chks))) - - if err := w.write(wr, b[:n]); err != nil { - return err - } - seq := uint64(w.seq()) << 32 - - for i := range chks { - chk := &chks[i] - - chk.Ref = seq | uint64(w.n) - - n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) - - if err := w.write(wr, b[:n]); err != nil { - return err - } - if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil { - return err - } - if err := w.write(wr, chk.Chunk.Bytes()); err != nil { - return err - } - chk.Chunk = nil - } - - if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil { - return err - } - return nil -} - -func (w *chunkWriter) seq() int { - return len(w.files) - 1 -} - -func (w *chunkWriter) Close() error { - return w.finalizeTail() -} - -// ChunkMeta holds information about a chunk of data. -type ChunkMeta struct { - // Ref and Chunk hold either a reference that can be used to retrieve - // chunk data or the data itself. - // Generally, only one of them is set. - Ref uint64 - Chunk chunks.Chunk - - MinTime, MaxTime int64 // time range the data covers -} - -// IndexWriter serialized the index for a block of series data. -// The methods must generally be called in order they are specified. -type IndexWriter interface { - // AddSeries populates the index writer witha series and its offsets - // of chunks that the index can reference. - // The reference number is used to resolve a series against the postings - // list iterator. It only has to be available during the write processing. - AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error - - // WriteLabelIndex serializes an index from label names to values. - // The passed in values chained tuples of strings of the length of names. - WriteLabelIndex(names []string, values []string) error - - // WritePostings writes a postings list for a single label pair. - WritePostings(name, value string, it Postings) error - - // Close writes any finalization and closes theresources associated with - // the underlying writer. - Close() error -} - -type indexWriterSeries struct { - labels labels.Labels - chunks []ChunkMeta // series file offset of chunks - offset uint32 // index file offset of series reference -} - -// indexWriter implements the IndexWriter interface for the standard -// serialization format. -type indexWriter struct { - f *os.File - bufw *bufio.Writer - n int64 - started bool - - // Reusable memory. - b []byte - uint32s []uint32 - - series map[uint32]*indexWriterSeries - symbols map[string]uint32 // symbol offsets - labelIndexes []hashEntry // label index offsets - postings []hashEntry // postings lists offsets - - crc32 hash.Hash -} - -func newIndexWriter(dir string) (*indexWriter, error) { - df, err := fileutil.OpenDir(dir) - if err != nil { - return nil, err - } - f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return nil, err - } - if err := fileutil.Fsync(df); err != nil { - return nil, errors.Wrap(err, "sync dir") - } - - iw := &indexWriter{ - f: f, - bufw: bufio.NewWriterSize(f, 1<<22), - n: 0, - - // Reusable memory. - b: make([]byte, 0, 1<<23), - uint32s: make([]uint32, 0, 1<<15), - - // Caches. - symbols: make(map[string]uint32, 1<<13), - series: make(map[uint32]*indexWriterSeries, 1<<16), - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), - } - if err := iw.writeMeta(); err != nil { - return nil, err - } - return iw, nil -} - -func (w *indexWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) - w.n += int64(n) - return err -} - -// section writes a CRC32 checksummed section of length l and guarded by flag. -func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error { - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.bufw) - - b := [5]byte{flag, 0, 0, 0, 0} - binary.BigEndian.PutUint32(b[1:], uint32(l)) - - if err := w.write(wr, b[:]); err != nil { - return errors.Wrap(err, "writing header") - } - - if err := f(wr); err != nil { - return errors.Wrap(err, "write contents") - } - if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil { - return errors.Wrap(err, "writing checksum") - } - return nil -} - -func (w *indexWriter) writeMeta() error { - b := [8]byte{} - - binary.BigEndian.PutUint32(b[:4], MagicIndex) - b[4] = flagStd - - return w.write(w.bufw, b[:]) -} - -func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error { - if _, ok := w.series[ref]; ok { - return errors.Errorf("series with reference %d already added", ref) - } - // Populate the symbol table from all label sets we have to reference. - for _, l := range lset { - w.symbols[l.Name] = 0 - w.symbols[l.Value] = 0 - } - - w.series[ref] = &indexWriterSeries{ - labels: lset, - chunks: chunks, - } - return nil -} - -func (w *indexWriter) writeSymbols() error { - // Generate sorted list of strings we will store as reference table. - symbols := make([]string, 0, len(w.symbols)) - for s := range w.symbols { - symbols = append(symbols, s) - } - sort.Strings(symbols) - - // The start of the section plus a 5 byte section header are our base. - // TODO(fabxc): switch to relative offsets and hold sections in a TOC. - base := uint32(w.n) + 5 - - buf := [binary.MaxVarintLen32]byte{} - w.b = append(w.b[:0], flagStd) - - for _, s := range symbols { - w.symbols[s] = base + uint32(len(w.b)) - - n := binary.PutUvarint(buf[:], uint64(len(s))) - w.b = append(w.b, buf[:n]...) - w.b = append(w.b, s...) - } - - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -type indexWriterSeriesSlice []*indexWriterSeries - -func (s indexWriterSeriesSlice) Len() int { return len(s) } -func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (s indexWriterSeriesSlice) Less(i, j int) bool { - return labels.Compare(s[i].labels, s[j].labels) < 0 -} - -func (w *indexWriter) writeSeries() error { - // Series must be stored sorted along their labels. - series := make(indexWriterSeriesSlice, 0, len(w.series)) - - for _, s := range w.series { - series = append(series, s) - } - sort.Sort(series) - - // Current end of file plus 5 bytes for section header. - // TODO(fabxc): switch to relative offsets. - base := uint32(w.n) + 5 - - w.b = w.b[:0] - buf := make([]byte, binary.MaxVarintLen64) - - for _, s := range series { - // Write label set symbol references. - s.offset = base + uint32(len(w.b)) - - n := binary.PutUvarint(buf, uint64(len(s.labels))) - w.b = append(w.b, buf[:n]...) - - for _, l := range s.labels { - n = binary.PutUvarint(buf, uint64(w.symbols[l.Name])) - w.b = append(w.b, buf[:n]...) - n = binary.PutUvarint(buf, uint64(w.symbols[l.Value])) - w.b = append(w.b, buf[:n]...) - } - - // Write chunks meta data including reference into chunk file. - n = binary.PutUvarint(buf, uint64(len(s.chunks))) - w.b = append(w.b, buf[:n]...) - - for _, c := range s.chunks { - n = binary.PutVarint(buf, c.MinTime) - w.b = append(w.b, buf[:n]...) - n = binary.PutVarint(buf, c.MaxTime) - w.b = append(w.b, buf[:n]...) - - n = binary.PutUvarint(buf, uint64(c.Ref)) - w.b = append(w.b, buf[:n]...) - } - } - - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -func (w *indexWriter) init() error { - if err := w.writeSymbols(); err != nil { - return err - } - if err := w.writeSeries(); err != nil { - return err - } - w.started = true - - return nil -} - -func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { - if !w.started { - if err := w.init(); err != nil { - return err - } - } - - valt, err := newStringTuples(values, len(names)) - if err != nil { - return err - } - sort.Sort(valt) - - w.labelIndexes = append(w.labelIndexes, hashEntry{ - name: strings.Join(names, string(sep)), - offset: uint32(w.n), - }) - - buf := make([]byte, binary.MaxVarintLen32) - n := binary.PutUvarint(buf, uint64(len(names))) - - l := n + len(values)*4 - - return w.section(l, flagStd, func(wr io.Writer) error { - // First byte indicates tuple size for index. - if err := w.write(wr, buf[:n]); err != nil { - return err - } - - for _, v := range valt.s { - binary.BigEndian.PutUint32(buf, w.symbols[v]) - - if err := w.write(wr, buf[:4]); err != nil { - return err - } - } - return nil - }) -} - -func (w *indexWriter) WritePostings(name, value string, it Postings) error { - if !w.started { - if err := w.init(); err != nil { - return err - } - } - - key := name + string(sep) + value - - w.postings = append(w.postings, hashEntry{ - name: key, - offset: uint32(w.n), - }) - - // Order of the references in the postings list does not imply order - // of the series references within the persisted block they are mapped to. - // We have to sort the new references again. - refs := w.uint32s[:0] - - for it.Next() { - s, ok := w.series[it.At()] - if !ok { - return errors.Errorf("series for reference %d not found", it.At()) - } - refs = append(refs, s.offset) - } - if err := it.Err(); err != nil { - return err - } - - sort.Sort(uint32slice(refs)) - - w.b = w.b[:0] - buf := make([]byte, 4) - - for _, r := range refs { - binary.BigEndian.PutUint32(buf, r) - w.b = append(w.b, buf...) - } - - w.uint32s = refs[:0] - - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -type uint32slice []uint32 - -func (s uint32slice) Len() int { return len(s) } -func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } - -type hashEntry struct { - name string - offset uint32 -} - -func (w *indexWriter) writeHashmap(h []hashEntry) error { - w.b = w.b[:0] - buf := [binary.MaxVarintLen32]byte{} - - for _, e := range h { - n := binary.PutUvarint(buf[:], uint64(len(e.name))) - w.b = append(w.b, buf[:n]...) - w.b = append(w.b, e.name...) - - n = binary.PutUvarint(buf[:], uint64(e.offset)) - w.b = append(w.b, buf[:n]...) - } - - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -func (w *indexWriter) finalize() error { - // Write out hash maps to jump to correct label index and postings sections. - lo := uint32(w.n) - if err := w.writeHashmap(w.labelIndexes); err != nil { - return err - } - - po := uint32(w.n) - if err := w.writeHashmap(w.postings); err != nil { - return err - } - - // Terminate index file with offsets to hashmaps. This is the entry Pointer - // for any index query. - // TODO(fabxc): also store offset to series section to allow plain - // iteration over all existing series? - b := [8]byte{} - binary.BigEndian.PutUint32(b[:4], lo) - binary.BigEndian.PutUint32(b[4:], po) - - return w.write(w.bufw, b[:]) -} - -func (w *indexWriter) Close() error { - if err := w.finalize(); err != nil { - return err - } - if err := w.bufw.Flush(); err != nil { - return err - } - if err := fileutil.Fsync(w.f); err != nil { - return err - } - return w.f.Close() -} diff --git a/vendor/vendor.json b/vendor/vendor.json index e183acfcf..49d0513f9 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -368,10 +368,10 @@ "revisionTime": "2016-09-30T00:14:02Z" }, { - "checksumSHA1": "IOnF9CNVjOBoVwdfzfUEv/+JotI=", + "checksumSHA1": "Aj4Cn1RClamxluIri/LQMnK/yB4=", "path": "github.com/fabxc/tsdb", - "revision": "55a9b5428aceb644b3b297d7a9fd63d0354ce953", - "revisionTime": "2017-03-04T15:50:48Z" + "revision": "ca1bc920b795cfc670002e7643471b0277e79a9b", + "revisionTime": "2017-03-08T15:54:13Z" }, { "checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=",