diff --git a/adapter.go b/adapter.go deleted file mode 100644 index 9774ab18a..000000000 --- a/adapter.go +++ /dev/null @@ -1,223 +0,0 @@ -package tsdb - -// import ( -// "errors" -// "fmt" -// "time" - -// "github.com/prometheus/common/model" -// "github.com/prometheus/prometheus/storage/local" -// "github.com/prometheus/prometheus/storage/metric" -// "golang.org/x/net/context" -// ) - -// type DefaultSeriesIterator struct { -// s Series -// it SeriesIterator -// } - -// func (it *DefaultSeriesIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair { -// ok := it.it.Seek(int64(ts)) -// if !ok { -// return model.SamplePair{Timestamp: model.Earliest} -// } -// t, v := it.it.Values() -// return model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)} -// } - -// func (it *DefaultSeriesIterator) Metric() metric.Metric { -// ls := it.s.Labels() -// met := make(model.Metric, len(ls)) -// for _, l := range ls { -// met[model.LabelName(l.Name)] = model.LabelValue(l.Value) -// } -// return metric.Metric{Metric: met, Copied: true} -// } - -// func (it *DefaultSeriesIterator) RangeValues(interval metric.Interval) []model.SamplePair { -// var res []model.SamplePair - -// for ok := it.it.Seek(int64(interval.NewestInclusive)); ok; ok = it.it.Next() { -// t, v := it.it.Values() -// if model.Time(t) > interval.OldestInclusive { -// break -// } -// res = append(res, model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)}) -// } -// return res -// } - -// func (it *DefaultSeriesIterator) Close() { -// } - -// // DefaultAdapter wraps a tsdb storage to implement the default -// // storage interface. -// type DefaultAdapter struct { -// db *DB -// } - -// func NewDefaultAdapter(db *DB) *DefaultAdapter { -// return &DefaultAdapter{db: db} -// } - -// // Drop all time series associated with the given label matchers. Returns -// // the number series that were dropped. -// func (da *DefaultAdapter) DropMetricsForLabelMatchers(context.Context, ...*metric.LabelMatcher) (int, error) { -// return 0, fmt.Errorf("not implemented") -// } - -// // Start the various maintenance loops in goroutines. Returns when the -// // storage is ready to use. Keeps everything running in the background -// // until Stop is called. -// func (da *DefaultAdapter) Start() error { -// return nil -// } - -// // Stop shuts down the Storage gracefully, flushes all pending -// // operations, stops all maintenance loops,and frees all resources. -// func (da *DefaultAdapter) Stop() error { -// return da.db.Close() -// } - -// // WaitForIndexing returns once all samples in the storage are -// // indexed. Indexing is needed for FingerprintsForLabelMatchers and -// // LabelValuesForLabelName and may lag behind. -// func (da *DefaultAdapter) WaitForIndexing() { -// } - -// func (da *DefaultAdapter) Append(s *model.Sample) error { -// labels := make([]Label, len(s.Metric)) -// for k, v := range s.Metric { -// labels = append(labels, Label{Name: string(k), Value: string(v)}) -// } -// // Ignore the Scrape batching for now. -// return da.db.appendSingle(labels, int64(s.Timestamp), float64(s.Value)) -// } - -// func (da *DefaultAdapter) NeedsThrottling() bool { -// return false -// } - -// func (da *DefaultAdapter) Querier() (local.Querier, error) { -// // q, err := da.db.Querier() -// // if err != nil { -// // return nil, err -// // } - -// return defaultQuerierAdapter{q: q}, nil -// } - -// type defaultQuerierAdapter struct { -// q Querier -// } - -// func (da defaultQuerierAdapter) Close() error { -// return da.q.Close() -// } - -// // QueryRange returns a list of series iterators for the selected -// // time range and label matchers. The iterators need to be closed -// // after usage. -// func (da defaultQuerierAdapter) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) { -// it, err := labelMatchersIter(da.q, matchers...) -// if err != nil { -// return nil, err -// } -// its, err := da.q.Series(it) -// if err != nil { -// return nil, err -// } -// var defaultIts []local.SeriesIterator -// for _, it := range its { -// defaultIts = append(defaultIts, &DefaultSeriesIterator{it: it}) -// } -// return defaultIts, nil -// } - -// // QueryInstant returns a list of series iterators for the selected -// // instant and label matchers. The iterators need to be closed after usage. -// func (da defaultQuerierAdapter) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) { -// return da.QueryRange(ctx, ts.Add(-stalenessDelta), ts, matchers...) -// } - -// // MetricsForLabelMatchers returns the metrics from storage that satisfy -// // the given sets of label matchers. Each set of matchers must contain at -// // least one label matcher that does not match the empty string. Otherwise, -// // an empty list is returned. Within one set of matchers, the intersection -// // of matching series is computed. The final return value will be the union -// // of the per-set results. The times from and through are hints for the -// // storage to optimize the search. The storage MAY exclude metrics that -// // have no samples in the specified interval from the returned map. In -// // doubt, specify model.Earliest for from and model.Latest for through. -// func (da defaultQuerierAdapter) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) { -// var mits []index.Iterator -// for _, ms := range matcherSets { -// it, err := labelMatchersIter(da.q, ms...) -// if err != nil { -// return nil, err -// } -// mits = append(mits, it) -// } - -// res, err := da.q.Metrics(index.Merge(mits...)) -// if err != nil { -// return nil, err -// } -// var mres []metric.Metric -// for _, m := range res { -// met := make(model.Metric, len(m)) -// for k, v := range m { -// met[model.LabelName(k)] = model.LabelValue(v) -// } -// mres = append(mres, metric.Metric{Metric: met, Copied: true}) -// } -// return mres, nil -// } - -// func labelMatchersIter(q *Querier, ms ...*metric.LabelMatcher) (index.Iterator, error) { -// var its []index.Iterator -// for _, m := range ms { -// var matcher index.Matcher -// switch m.Type { -// case metric.Equal: -// matcher = index.NewEqualMatcher(string(m.Value)) -// case metric.RegexMatch: -// var err error -// matcher, err = index.NewRegexpMatcher(string(m.Value)) -// if err != nil { -// return nil, err -// } -// default: -// return nil, fmt.Errorf("matcher type %q not supported", m.Type) -// } -// it, err := q.Iterator(string(m.Name), matcher) -// if err != nil { -// return nil, err -// } -// its = append(its, it) -// } -// if len(its) == 0 { -// return nil, errors.New("not found") -// } -// return index.Intersect(its...), nil -// } - -// // LastSampleForFingerprint returns the last sample that has been -// // ingested for the given sets of label matchers. If this instance of the -// // Storage has never ingested a sample for the provided fingerprint (or -// // the last ingestion is so long ago that the series has been archived), -// // ZeroSample is returned. The label matching behavior is the same as in -// // MetricsForLabelMatchers. -// func (da defaultQuerierAdapter) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { -// return nil, fmt.Errorf("not implemented") -// } - -// // Get all of the label values that are associated with a given label name. -// func (da defaultQuerierAdapter) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) { -// res := da.q.iq.Terms(string(ln), nil) -// resv := model.LabelValues{} -// for _, lv := range res { -// resv = append(resv, model.LabelValue(lv)) -// } -// return resv, nil -// } diff --git a/db.go b/db.go index 78aa7ad84..8842978dd 100644 --- a/db.go +++ b/db.go @@ -12,7 +12,7 @@ import ( "github.com/cespare/xxhash" "github.com/fabxc/tsdb/chunks" - "github.com/prometheus/common/log" + "github.com/go-kit/kit/log" ) // DefaultOptions used for the DB. They are sane for setups using @@ -37,7 +37,7 @@ type DB struct { // TODO(fabxc): make configurable const ( - seriesShardShift = 3 + seriesShardShift = 2 numSeriesShards = 1 << seriesShardShift maxChunkSize = 1024 ) @@ -50,6 +50,10 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { if err := os.MkdirAll(path, 0777); err != nil { return nil, err } + if l == nil { + l = log.NewLogfmtLogger(os.Stdout) + l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + } c := &DB{ logger: l, @@ -62,7 +66,8 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { // for the bitshift-modulo when finding the right shard. for i := 0; i < numSeriesShards; i++ { path := filepath.Join(path, fmt.Sprintf("%d", i)) - c.shards = append(c.shards, NewSeriesShard(path, l.With("shard", i))) + l := log.NewContext(l).With("shard", i) + c.shards = append(c.shards, NewSeriesShard(path, l)) } // TODO(fabxc): run background compaction + GC. @@ -170,7 +175,8 @@ func (db *DB) AppendVector(ts int64, v *Vector) error { return nil } -func (db *DB) appendSingle(lset Labels, ts int64, v float64) error { +func (db *DB) AppendSingle(lset Labels, ts int64, v float64) error { + sort.Sort(lset) h := lset.Hash() s := uint16(h >> (64 - seriesShardShift)) @@ -190,7 +196,6 @@ const sep = '\xff' type SeriesShard struct { path string persistCh chan struct{} - done chan struct{} logger log.Logger mtx sync.RWMutex @@ -203,7 +208,6 @@ func NewSeriesShard(path string, logger log.Logger) *SeriesShard { s := &SeriesShard{ path: path, persistCh: make(chan struct{}, 1), - done: make(chan struct{}), logger: logger, // TODO(fabxc): restore from checkpoint. // TODO(fabxc): provide access to persisted blocks. @@ -218,7 +222,6 @@ func NewSeriesShard(path string, logger log.Logger) *SeriesShard { // Close the series shard. func (s *SeriesShard) Close() error { - close(s.done) return nil } @@ -255,7 +258,7 @@ func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error { // blocksForRange returns all blocks within the shard that may contain // data for the given time range. func (s *SeriesShard) blocksForRange(mint, maxt int64) (bs []Block) { - return nil + return []Block{s.head} } // TODO(fabxc): make configurable. @@ -325,10 +328,7 @@ func (s *SeriesShard) persist() error { sz := fmt.Sprintf("%.2fMiB", float64(sw.Size()+iw.Size())/1024/1024) - s.logger.With("size", sz). - With("samples", head.samples). - With("chunks", head.stats().chunks). - Debug("persisted head") + s.logger.Log("size", sz, "samples", head.samples, "chunks", head.stats().chunks, "msg", "persisted head") return nil } diff --git a/head.go b/head.go index 73db76086..73342e463 100644 --- a/head.go +++ b/head.go @@ -2,6 +2,7 @@ package tsdb import ( "math" + "sort" "sync" "github.com/fabxc/tsdb/chunks" @@ -27,6 +28,66 @@ func NewHeadBlock(baseTime int64) *HeadBlock { } } +// Querier returns a new querier over the head block. +func (h *HeadBlock) Querier(mint, maxt int64) Querier { + return newBlockQuerier(h, h, mint, maxt) +} + +func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { + c, ok := h.index.forward[ref] + if !ok { + return nil, errNotFound + } + return c.chunk, nil +} + +// Stats returns statisitics about the indexed data. +func (h *HeadBlock) Stats() (*BlockStats, error) { + return nil, nil +} + +// LabelValues returns the possible label values +func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) { + if len(names) != 1 { + return nil, errInvalidSize + } + var sl []string + + for s := range h.index.values[names[0]] { + sl = append(sl, s) + } + sort.Strings(sl) + + t := &stringTuples{ + l: len(names), + s: sl, + } + return t, nil +} + +// Postings returns the postings list iterator for the label pair. +func (h *HeadBlock) Postings(name, value string) (Iterator, error) { + return h.index.Postings(term{name, value}), nil +} + +// Series returns the series for the given reference. +func (h *HeadBlock) Series(ref uint32) (Series, error) { + cd, ok := h.index.forward[ref] + if !ok { + return nil, errNotFound + } + s := &series{ + labels: cd.lset, + offsets: []ChunkOffset{ + {Value: h.baseTimestamp, Offset: 0}, + }, + chunk: func(ref uint32) (chunks.Chunk, error) { + return cd.chunk, nil + }, + } + return s, nil +} + // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. func (h *HeadBlock) get(hash uint64, lset Labels) *chunkDesc { diff --git a/index.go b/index.go index 43bbd8ee0..9a7e10228 100644 --- a/index.go +++ b/index.go @@ -5,11 +5,6 @@ import ( "strings" ) -// Index provides read access to an inverted index. -type Index interface { - Postings(ref uint32) Iterator -} - type memIndex struct { lastID uint32 @@ -111,7 +106,7 @@ func (e errIterator) Err() error { return e.err } // input iterators. func Intersect(its ...Iterator) Iterator { if len(its) == 0 { - return nil + return errIterator{err: nil} } a := its[0] diff --git a/querier.go b/querier.go index 3b049332b..c359ee34a 100644 --- a/querier.go +++ b/querier.go @@ -76,13 +76,6 @@ func (db *DB) Querier(mint, maxt int64) Querier { return q } -// SeriesSet contains a set of series. -type SeriesSet interface { - Next() bool - Series() Series - Err() error -} - func (q *querier) Select(ms ...Matcher) SeriesSet { // We gather the non-overlapping series from every shard and simply // return their union. @@ -91,6 +84,9 @@ func (q *querier) Select(ms ...Matcher) SeriesSet { for _, s := range q.shards { r.sets = append(r.sets, s.Select(ms...)) } + if len(r.sets) == 0 { + return nopSeriesSet{} + } return r } @@ -127,6 +123,123 @@ func (s *SeriesShard) Querier(mint, maxt int64) Querier { return sq } +func (q *shardQuerier) LabelValues(string) ([]string, error) { + return nil, nil +} + +func (q *shardQuerier) LabelValuesFor(string, Label) ([]string, error) { + return nil, fmt.Errorf("not implemented") +} + +func (q *shardQuerier) Close() error { + return nil +} + +// blockQuerier provides querying access to a single block database. +type blockQuerier struct { + index IndexReader + series SeriesReader + + mint, maxt int64 +} + +func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQuerier { + return &blockQuerier{ + mint: mint, + maxt: maxt, + index: ix, + series: s, + } +} + +func (q *blockQuerier) Select(ms ...Matcher) SeriesSet { + var its []Iterator + for _, m := range ms { + its = append(its, q.selectSingle(m)) + } + + // TODO(fabxc): pass down time range so the series iterator + // can be instantiated with it? + return &blockSeriesSet{ + index: q.index, + it: Intersect(its...), + } +} + +func (q *blockQuerier) selectSingle(m Matcher) Iterator { + tpls, err := q.index.LabelValues(m.Name()) + if err != nil { + return errIterator{err: err} + } + // TODO(fabxc): use interface upgrading to provide fast solution + // for equality and prefix matches. Tuples are lexicographically sorted. + var res []string + + for i := 0; i < tpls.Len(); i++ { + vals, err := tpls.At(i) + if err != nil { + return errIterator{err: err} + } + if m.Match(vals[0]) { + res = append(res, vals[0]) + } + } + + if len(res) == 0 { + return errIterator{err: nil} + } + + var rit []Iterator + + for _, v := range res { + it, err := q.index.Postings(m.Name(), v) + if err != nil { + return errIterator{err: err} + } + rit = append(rit, it) + } + + return Intersect(rit...) +} + +func (q *blockQuerier) LabelValues(name string) ([]string, error) { + tpls, err := q.index.LabelValues(name) + if err != nil { + return nil, err + } + res := make([]string, 0, tpls.Len()) + + for i := 0; i < tpls.Len(); i++ { + vals, err := tpls.At(i) + if err != nil { + return nil, err + } + res = append(res, vals[0]) + } + return nil, nil +} + +func (q *blockQuerier) LabelValuesFor(string, Label) ([]string, error) { + return nil, fmt.Errorf("not implemented") +} + +func (q *blockQuerier) Close() error { + return nil +} + +// SeriesSet contains a set of series. +type SeriesSet interface { + Next() bool + Series() Series + Err() error +} + +type nopSeriesSet struct{} + +func (nopSeriesSet) Next() bool { return false } +func (nopSeriesSet) Series() Series { return nil } +func (nopSeriesSet) Err() error { return nil } + type mergedSeriesSet struct { sets []SeriesSet @@ -143,14 +256,32 @@ func (s *mergedSeriesSet) Next() bool { if s.sets[s.cur].Next() { return true } - s.cur++ - if s.cur == len(s.sets) { + if s.cur == len(s.sets)-1 { return false } + s.cur++ + return s.Next() } +func (q *shardQuerier) Select(ms ...Matcher) SeriesSet { + // Sets from different blocks have no time overlap. The reference numbers + // they emit point to series sorted in lexicographic order. + // We can fully connect partial series by simply comparing with the previous + // label set. + if len(q.blocks) == 0 { + return nopSeriesSet{} + } + r := q.blocks[0].Select(ms...) + + for _, s := range q.blocks[1:] { + r = &shardSeriesSet{a: r, b: s.Select(ms...)} + } + + return r +} + type shardSeriesSet struct { a, b SeriesSet @@ -245,123 +376,6 @@ func (s *shardSeriesSet) Next() bool { return true } -func (q *shardQuerier) Select(ms ...Matcher) SeriesSet { - // Sets from different blocks have no time overlap. The reference numbers - // they emit point to series sorted in lexicographic order. - // We can fully connect partial series by simply comparing with the previous - // label set. - if len(q.blocks) == 0 { - return nil - } - r := q.blocks[0].Select(ms...) - - for _, s := range q.blocks[1:] { - r = &shardSeriesSet{a: r, b: s.Select(ms...)} - } - - return r -} - -func (q *shardQuerier) LabelValues(string) ([]string, error) { - return nil, nil -} - -func (q *shardQuerier) LabelValuesFor(string, Label) ([]string, error) { - return nil, fmt.Errorf("not implemented") -} - -func (q *shardQuerier) Close() error { - return nil -} - -// blockQuerier provides querying access to a single block database. -type blockQuerier struct { - index IndexReader - series SeriesReader - - mint, maxt int64 -} - -func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQuerier { - return &blockQuerier{ - mint: mint, - maxt: maxt, - index: ix, - series: s, - } -} - -func (q *blockQuerier) Select(ms ...Matcher) SeriesSet { - var its []Iterator - for _, m := range ms { - its = append(its, q.selectSingle(m)) - } - - // TODO(fabxc): pass down time range so the series iterator - // can be instantiated with it? - return &blockSeriesSet{ - index: q.index, - it: Intersect(its...), - } -} - -func (q *blockQuerier) selectSingle(m Matcher) Iterator { - tpls, err := q.index.LabelValues(m.Name()) - if err != nil { - return errIterator{err: err} - } - // TODO(fabxc): use interface upgrading to provide fast solution - // for equality and prefix matches. Tuples are lexicographically sorted. - var res []string - - for i := 0; i < tpls.Len(); i++ { - vals, err := tpls.At(i) - if err != nil { - return errIterator{err: err} - } - if m.Match(vals[0]) { - res = append(res, vals[0]) - } - } - - var rit Iterator - - for _, v := range res { - it, err := q.index.Postings(m.Name(), v) - if err != nil { - return errIterator{err: err} - } - rit = Intersect(rit, it) - } - - return rit -} - -func (q *blockQuerier) LabelValues(name string) ([]string, error) { - tpls, err := q.index.LabelValues(name) - if err != nil { - return nil, err - } - res := make([]string, 0, tpls.Len()) - - for i := 0; i < tpls.Len(); i++ { - vals, err := tpls.At(i) - if err != nil { - return nil, err - } - res = append(res, vals[0]) - } - return nil, nil -} - -func (q *blockQuerier) LabelValuesFor(string, Label) ([]string, error) { - return nil, fmt.Errorf("not implemented") -} - -func (q *blockQuerier) Close() error { - return nil -} - // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { index IndexReader @@ -500,7 +514,7 @@ func (it *chunkSeriesIterator) Err() error { return it.cur.Err() } -type bufferedSeriesIterator struct { +type BufferedSeriesIterator struct { // TODO(fabxc): time-based look back buffer for time-aggregating // queries such as rate. It should allow us to re-use an iterator // within a range query while calculating time-aggregates at any point. @@ -509,10 +523,11 @@ type bufferedSeriesIterator struct { // the simpler interface. // // Consider making this the main external interface. - SeriesIterator + it SeriesIterator + n int - buf []sample // lookback buffer - i int // current head + buf []sample // lookback buffer + last sample } type sample struct { @@ -520,6 +535,44 @@ type sample struct { v float64 } -func (b *bufferedSeriesIterator) PeekBack(i int) (t int64, v float64, ok bool) { - return 0, 0, false +func NewBufferedSeriesIterator(it SeriesIterator) *BufferedSeriesIterator { + return &BufferedSeriesIterator{ + it: it, + } +} + +func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) { + return b.last.t, b.last.v, true +} + +func (b *BufferedSeriesIterator) Seek(t int64) bool { + t0 := t - 20000 // TODO(fabxc): hard-coded 20s lookback, make configurable. + + ok := b.it.Seek(t0) + if !ok { + return false + } + b.last.t, b.last.v = b.it.Values() + + // TODO(fabxc): skip to relevant chunk. + for b.Next() { + if ts, _ := b.Values(); ts >= t { + return true + } + } + return false +} + +func (b *BufferedSeriesIterator) Next() bool { + b.last.t, b.last.v = b.it.Values() + + return b.it.Next() +} + +func (b *BufferedSeriesIterator) Values() (int64, float64) { + return b.it.Values() +} + +func (b *BufferedSeriesIterator) Err() error { + return b.it.Err() } diff --git a/reader.go b/reader.go index ab3621209..ffc7ac0e8 100644 --- a/reader.go +++ b/reader.go @@ -54,7 +54,7 @@ type IndexReader interface { // LabelValues returns the possible label values LabelValues(names ...string) (StringTuples, error) - // Postings returns the postings list iteartor for the label pair. + // Postings returns the postings list iterator for the label pair. Postings(name, value string) (Iterator, error) // Series returns the series for the given reference. @@ -83,6 +83,7 @@ type indexReader struct { var ( errInvalidSize = fmt.Errorf("invalid size") errInvalidFlag = fmt.Errorf("invalid flag") + errNotFound = fmt.Errorf("not found") ) func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) { @@ -299,6 +300,14 @@ func (s *series) Labels() Labels { func (s *series) Iterator() SeriesIterator { var cs []chunks.Chunk + for _, co := range s.offsets { + c, err := s.chunk(co.Offset) + if err != nil { + panic(err) // TODO(fabxc): add error series iterator. + } + cs = append(cs, c) + } + return newChunkSeriesIterator(cs) }