Misc fixes for initial Prometheus integration

This commit is contained in:
Fabian Reinartz 2016-12-14 18:38:46 +01:00
parent 725385ea05
commit ca89080128
6 changed files with 269 additions and 374 deletions

View File

@ -1,223 +0,0 @@
package tsdb
// import (
// "errors"
// "fmt"
// "time"
// "github.com/prometheus/common/model"
// "github.com/prometheus/prometheus/storage/local"
// "github.com/prometheus/prometheus/storage/metric"
// "golang.org/x/net/context"
// )
// type DefaultSeriesIterator struct {
// s Series
// it SeriesIterator
// }
// func (it *DefaultSeriesIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair {
// ok := it.it.Seek(int64(ts))
// if !ok {
// return model.SamplePair{Timestamp: model.Earliest}
// }
// t, v := it.it.Values()
// return model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)}
// }
// func (it *DefaultSeriesIterator) Metric() metric.Metric {
// ls := it.s.Labels()
// met := make(model.Metric, len(ls))
// for _, l := range ls {
// met[model.LabelName(l.Name)] = model.LabelValue(l.Value)
// }
// return metric.Metric{Metric: met, Copied: true}
// }
// func (it *DefaultSeriesIterator) RangeValues(interval metric.Interval) []model.SamplePair {
// var res []model.SamplePair
// for ok := it.it.Seek(int64(interval.NewestInclusive)); ok; ok = it.it.Next() {
// t, v := it.it.Values()
// if model.Time(t) > interval.OldestInclusive {
// break
// }
// res = append(res, model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)})
// }
// return res
// }
// func (it *DefaultSeriesIterator) Close() {
// }
// // DefaultAdapter wraps a tsdb storage to implement the default
// // storage interface.
// type DefaultAdapter struct {
// db *DB
// }
// func NewDefaultAdapter(db *DB) *DefaultAdapter {
// return &DefaultAdapter{db: db}
// }
// // Drop all time series associated with the given label matchers. Returns
// // the number series that were dropped.
// func (da *DefaultAdapter) DropMetricsForLabelMatchers(context.Context, ...*metric.LabelMatcher) (int, error) {
// return 0, fmt.Errorf("not implemented")
// }
// // Start the various maintenance loops in goroutines. Returns when the
// // storage is ready to use. Keeps everything running in the background
// // until Stop is called.
// func (da *DefaultAdapter) Start() error {
// return nil
// }
// // Stop shuts down the Storage gracefully, flushes all pending
// // operations, stops all maintenance loops,and frees all resources.
// func (da *DefaultAdapter) Stop() error {
// return da.db.Close()
// }
// // WaitForIndexing returns once all samples in the storage are
// // indexed. Indexing is needed for FingerprintsForLabelMatchers and
// // LabelValuesForLabelName and may lag behind.
// func (da *DefaultAdapter) WaitForIndexing() {
// }
// func (da *DefaultAdapter) Append(s *model.Sample) error {
// labels := make([]Label, len(s.Metric))
// for k, v := range s.Metric {
// labels = append(labels, Label{Name: string(k), Value: string(v)})
// }
// // Ignore the Scrape batching for now.
// return da.db.appendSingle(labels, int64(s.Timestamp), float64(s.Value))
// }
// func (da *DefaultAdapter) NeedsThrottling() bool {
// return false
// }
// func (da *DefaultAdapter) Querier() (local.Querier, error) {
// // q, err := da.db.Querier()
// // if err != nil {
// // return nil, err
// // }
// return defaultQuerierAdapter{q: q}, nil
// }
// type defaultQuerierAdapter struct {
// q Querier
// }
// func (da defaultQuerierAdapter) Close() error {
// return da.q.Close()
// }
// // QueryRange returns a list of series iterators for the selected
// // time range and label matchers. The iterators need to be closed
// // after usage.
// func (da defaultQuerierAdapter) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
// it, err := labelMatchersIter(da.q, matchers...)
// if err != nil {
// return nil, err
// }
// its, err := da.q.Series(it)
// if err != nil {
// return nil, err
// }
// var defaultIts []local.SeriesIterator
// for _, it := range its {
// defaultIts = append(defaultIts, &DefaultSeriesIterator{it: it})
// }
// return defaultIts, nil
// }
// // QueryInstant returns a list of series iterators for the selected
// // instant and label matchers. The iterators need to be closed after usage.
// func (da defaultQuerierAdapter) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
// return da.QueryRange(ctx, ts.Add(-stalenessDelta), ts, matchers...)
// }
// // MetricsForLabelMatchers returns the metrics from storage that satisfy
// // the given sets of label matchers. Each set of matchers must contain at
// // least one label matcher that does not match the empty string. Otherwise,
// // an empty list is returned. Within one set of matchers, the intersection
// // of matching series is computed. The final return value will be the union
// // of the per-set results. The times from and through are hints for the
// // storage to optimize the search. The storage MAY exclude metrics that
// // have no samples in the specified interval from the returned map. In
// // doubt, specify model.Earliest for from and model.Latest for through.
// func (da defaultQuerierAdapter) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) {
// var mits []index.Iterator
// for _, ms := range matcherSets {
// it, err := labelMatchersIter(da.q, ms...)
// if err != nil {
// return nil, err
// }
// mits = append(mits, it)
// }
// res, err := da.q.Metrics(index.Merge(mits...))
// if err != nil {
// return nil, err
// }
// var mres []metric.Metric
// for _, m := range res {
// met := make(model.Metric, len(m))
// for k, v := range m {
// met[model.LabelName(k)] = model.LabelValue(v)
// }
// mres = append(mres, metric.Metric{Metric: met, Copied: true})
// }
// return mres, nil
// }
// func labelMatchersIter(q *Querier, ms ...*metric.LabelMatcher) (index.Iterator, error) {
// var its []index.Iterator
// for _, m := range ms {
// var matcher index.Matcher
// switch m.Type {
// case metric.Equal:
// matcher = index.NewEqualMatcher(string(m.Value))
// case metric.RegexMatch:
// var err error
// matcher, err = index.NewRegexpMatcher(string(m.Value))
// if err != nil {
// return nil, err
// }
// default:
// return nil, fmt.Errorf("matcher type %q not supported", m.Type)
// }
// it, err := q.Iterator(string(m.Name), matcher)
// if err != nil {
// return nil, err
// }
// its = append(its, it)
// }
// if len(its) == 0 {
// return nil, errors.New("not found")
// }
// return index.Intersect(its...), nil
// }
// // LastSampleForFingerprint returns the last sample that has been
// // ingested for the given sets of label matchers. If this instance of the
// // Storage has never ingested a sample for the provided fingerprint (or
// // the last ingestion is so long ago that the series has been archived),
// // ZeroSample is returned. The label matching behavior is the same as in
// // MetricsForLabelMatchers.
// func (da defaultQuerierAdapter) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
// return nil, fmt.Errorf("not implemented")
// }
// // Get all of the label values that are associated with a given label name.
// func (da defaultQuerierAdapter) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) {
// res := da.q.iq.Terms(string(ln), nil)
// resv := model.LabelValues{}
// for _, lv := range res {
// resv = append(resv, model.LabelValue(lv))
// }
// return resv, nil
// }

24
db.go
View File

@ -12,7 +12,7 @@ import (
"github.com/cespare/xxhash"
"github.com/fabxc/tsdb/chunks"
"github.com/prometheus/common/log"
"github.com/go-kit/kit/log"
)
// DefaultOptions used for the DB. They are sane for setups using
@ -37,7 +37,7 @@ type DB struct {
// TODO(fabxc): make configurable
const (
seriesShardShift = 3
seriesShardShift = 2
numSeriesShards = 1 << seriesShardShift
maxChunkSize = 1024
)
@ -50,6 +50,10 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
if l == nil {
l = log.NewLogfmtLogger(os.Stdout)
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
}
c := &DB{
logger: l,
@ -62,7 +66,8 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) {
// for the bitshift-modulo when finding the right shard.
for i := 0; i < numSeriesShards; i++ {
path := filepath.Join(path, fmt.Sprintf("%d", i))
c.shards = append(c.shards, NewSeriesShard(path, l.With("shard", i)))
l := log.NewContext(l).With("shard", i)
c.shards = append(c.shards, NewSeriesShard(path, l))
}
// TODO(fabxc): run background compaction + GC.
@ -170,7 +175,8 @@ func (db *DB) AppendVector(ts int64, v *Vector) error {
return nil
}
func (db *DB) appendSingle(lset Labels, ts int64, v float64) error {
func (db *DB) AppendSingle(lset Labels, ts int64, v float64) error {
sort.Sort(lset)
h := lset.Hash()
s := uint16(h >> (64 - seriesShardShift))
@ -190,7 +196,6 @@ const sep = '\xff'
type SeriesShard struct {
path string
persistCh chan struct{}
done chan struct{}
logger log.Logger
mtx sync.RWMutex
@ -203,7 +208,6 @@ func NewSeriesShard(path string, logger log.Logger) *SeriesShard {
s := &SeriesShard{
path: path,
persistCh: make(chan struct{}, 1),
done: make(chan struct{}),
logger: logger,
// TODO(fabxc): restore from checkpoint.
// TODO(fabxc): provide access to persisted blocks.
@ -218,7 +222,6 @@ func NewSeriesShard(path string, logger log.Logger) *SeriesShard {
// Close the series shard.
func (s *SeriesShard) Close() error {
close(s.done)
return nil
}
@ -255,7 +258,7 @@ func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error {
// blocksForRange returns all blocks within the shard that may contain
// data for the given time range.
func (s *SeriesShard) blocksForRange(mint, maxt int64) (bs []Block) {
return nil
return []Block{s.head}
}
// TODO(fabxc): make configurable.
@ -325,10 +328,7 @@ func (s *SeriesShard) persist() error {
sz := fmt.Sprintf("%.2fMiB", float64(sw.Size()+iw.Size())/1024/1024)
s.logger.With("size", sz).
With("samples", head.samples).
With("chunks", head.stats().chunks).
Debug("persisted head")
s.logger.Log("size", sz, "samples", head.samples, "chunks", head.stats().chunks, "msg", "persisted head")
return nil
}

61
head.go
View File

@ -2,6 +2,7 @@ package tsdb
import (
"math"
"sort"
"sync"
"github.com/fabxc/tsdb/chunks"
@ -27,6 +28,66 @@ func NewHeadBlock(baseTime int64) *HeadBlock {
}
}
// Querier returns a new querier over the head block.
func (h *HeadBlock) Querier(mint, maxt int64) Querier {
return newBlockQuerier(h, h, mint, maxt)
}
func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
c, ok := h.index.forward[ref]
if !ok {
return nil, errNotFound
}
return c.chunk, nil
}
// Stats returns statisitics about the indexed data.
func (h *HeadBlock) Stats() (*BlockStats, error) {
return nil, nil
}
// LabelValues returns the possible label values
func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) {
if len(names) != 1 {
return nil, errInvalidSize
}
var sl []string
for s := range h.index.values[names[0]] {
sl = append(sl, s)
}
sort.Strings(sl)
t := &stringTuples{
l: len(names),
s: sl,
}
return t, nil
}
// Postings returns the postings list iterator for the label pair.
func (h *HeadBlock) Postings(name, value string) (Iterator, error) {
return h.index.Postings(term{name, value}), nil
}
// Series returns the series for the given reference.
func (h *HeadBlock) Series(ref uint32) (Series, error) {
cd, ok := h.index.forward[ref]
if !ok {
return nil, errNotFound
}
s := &series{
labels: cd.lset,
offsets: []ChunkOffset{
{Value: h.baseTimestamp, Offset: 0},
},
chunk: func(ref uint32) (chunks.Chunk, error) {
return cd.chunk, nil
},
}
return s, nil
}
// get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet.
func (h *HeadBlock) get(hash uint64, lset Labels) *chunkDesc {

View File

@ -5,11 +5,6 @@ import (
"strings"
)
// Index provides read access to an inverted index.
type Index interface {
Postings(ref uint32) Iterator
}
type memIndex struct {
lastID uint32
@ -111,7 +106,7 @@ func (e errIterator) Err() error { return e.err }
// input iterators.
func Intersect(its ...Iterator) Iterator {
if len(its) == 0 {
return nil
return errIterator{err: nil}
}
a := its[0]

View File

@ -76,13 +76,6 @@ func (db *DB) Querier(mint, maxt int64) Querier {
return q
}
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool
Series() Series
Err() error
}
func (q *querier) Select(ms ...Matcher) SeriesSet {
// We gather the non-overlapping series from every shard and simply
// return their union.
@ -91,6 +84,9 @@ func (q *querier) Select(ms ...Matcher) SeriesSet {
for _, s := range q.shards {
r.sets = append(r.sets, s.Select(ms...))
}
if len(r.sets) == 0 {
return nopSeriesSet{}
}
return r
}
@ -127,6 +123,123 @@ func (s *SeriesShard) Querier(mint, maxt int64) Querier {
return sq
}
func (q *shardQuerier) LabelValues(string) ([]string, error) {
return nil, nil
}
func (q *shardQuerier) LabelValuesFor(string, Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *shardQuerier) Close() error {
return nil
}
// blockQuerier provides querying access to a single block database.
type blockQuerier struct {
index IndexReader
series SeriesReader
mint, maxt int64
}
func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQuerier {
return &blockQuerier{
mint: mint,
maxt: maxt,
index: ix,
series: s,
}
}
func (q *blockQuerier) Select(ms ...Matcher) SeriesSet {
var its []Iterator
for _, m := range ms {
its = append(its, q.selectSingle(m))
}
// TODO(fabxc): pass down time range so the series iterator
// can be instantiated with it?
return &blockSeriesSet{
index: q.index,
it: Intersect(its...),
}
}
func (q *blockQuerier) selectSingle(m Matcher) Iterator {
tpls, err := q.index.LabelValues(m.Name())
if err != nil {
return errIterator{err: err}
}
// TODO(fabxc): use interface upgrading to provide fast solution
// for equality and prefix matches. Tuples are lexicographically sorted.
var res []string
for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if err != nil {
return errIterator{err: err}
}
if m.Match(vals[0]) {
res = append(res, vals[0])
}
}
if len(res) == 0 {
return errIterator{err: nil}
}
var rit []Iterator
for _, v := range res {
it, err := q.index.Postings(m.Name(), v)
if err != nil {
return errIterator{err: err}
}
rit = append(rit, it)
}
return Intersect(rit...)
}
func (q *blockQuerier) LabelValues(name string) ([]string, error) {
tpls, err := q.index.LabelValues(name)
if err != nil {
return nil, err
}
res := make([]string, 0, tpls.Len())
for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if err != nil {
return nil, err
}
res = append(res, vals[0])
}
return nil, nil
}
func (q *blockQuerier) LabelValuesFor(string, Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *blockQuerier) Close() error {
return nil
}
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool
Series() Series
Err() error
}
type nopSeriesSet struct{}
func (nopSeriesSet) Next() bool { return false }
func (nopSeriesSet) Series() Series { return nil }
func (nopSeriesSet) Err() error { return nil }
type mergedSeriesSet struct {
sets []SeriesSet
@ -143,14 +256,32 @@ func (s *mergedSeriesSet) Next() bool {
if s.sets[s.cur].Next() {
return true
}
s.cur++
if s.cur == len(s.sets) {
if s.cur == len(s.sets)-1 {
return false
}
s.cur++
return s.Next()
}
func (q *shardQuerier) Select(ms ...Matcher) SeriesSet {
// Sets from different blocks have no time overlap. The reference numbers
// they emit point to series sorted in lexicographic order.
// We can fully connect partial series by simply comparing with the previous
// label set.
if len(q.blocks) == 0 {
return nopSeriesSet{}
}
r := q.blocks[0].Select(ms...)
for _, s := range q.blocks[1:] {
r = &shardSeriesSet{a: r, b: s.Select(ms...)}
}
return r
}
type shardSeriesSet struct {
a, b SeriesSet
@ -245,123 +376,6 @@ func (s *shardSeriesSet) Next() bool {
return true
}
func (q *shardQuerier) Select(ms ...Matcher) SeriesSet {
// Sets from different blocks have no time overlap. The reference numbers
// they emit point to series sorted in lexicographic order.
// We can fully connect partial series by simply comparing with the previous
// label set.
if len(q.blocks) == 0 {
return nil
}
r := q.blocks[0].Select(ms...)
for _, s := range q.blocks[1:] {
r = &shardSeriesSet{a: r, b: s.Select(ms...)}
}
return r
}
func (q *shardQuerier) LabelValues(string) ([]string, error) {
return nil, nil
}
func (q *shardQuerier) LabelValuesFor(string, Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *shardQuerier) Close() error {
return nil
}
// blockQuerier provides querying access to a single block database.
type blockQuerier struct {
index IndexReader
series SeriesReader
mint, maxt int64
}
func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQuerier {
return &blockQuerier{
mint: mint,
maxt: maxt,
index: ix,
series: s,
}
}
func (q *blockQuerier) Select(ms ...Matcher) SeriesSet {
var its []Iterator
for _, m := range ms {
its = append(its, q.selectSingle(m))
}
// TODO(fabxc): pass down time range so the series iterator
// can be instantiated with it?
return &blockSeriesSet{
index: q.index,
it: Intersect(its...),
}
}
func (q *blockQuerier) selectSingle(m Matcher) Iterator {
tpls, err := q.index.LabelValues(m.Name())
if err != nil {
return errIterator{err: err}
}
// TODO(fabxc): use interface upgrading to provide fast solution
// for equality and prefix matches. Tuples are lexicographically sorted.
var res []string
for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if err != nil {
return errIterator{err: err}
}
if m.Match(vals[0]) {
res = append(res, vals[0])
}
}
var rit Iterator
for _, v := range res {
it, err := q.index.Postings(m.Name(), v)
if err != nil {
return errIterator{err: err}
}
rit = Intersect(rit, it)
}
return rit
}
func (q *blockQuerier) LabelValues(name string) ([]string, error) {
tpls, err := q.index.LabelValues(name)
if err != nil {
return nil, err
}
res := make([]string, 0, tpls.Len())
for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if err != nil {
return nil, err
}
res = append(res, vals[0])
}
return nil, nil
}
func (q *blockQuerier) LabelValuesFor(string, Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *blockQuerier) Close() error {
return nil
}
// blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct {
index IndexReader
@ -500,7 +514,7 @@ func (it *chunkSeriesIterator) Err() error {
return it.cur.Err()
}
type bufferedSeriesIterator struct {
type BufferedSeriesIterator struct {
// TODO(fabxc): time-based look back buffer for time-aggregating
// queries such as rate. It should allow us to re-use an iterator
// within a range query while calculating time-aggregates at any point.
@ -509,10 +523,11 @@ type bufferedSeriesIterator struct {
// the simpler interface.
//
// Consider making this the main external interface.
SeriesIterator
it SeriesIterator
n int
buf []sample // lookback buffer
i int // current head
buf []sample // lookback buffer
last sample
}
type sample struct {
@ -520,6 +535,44 @@ type sample struct {
v float64
}
func (b *bufferedSeriesIterator) PeekBack(i int) (t int64, v float64, ok bool) {
return 0, 0, false
func NewBufferedSeriesIterator(it SeriesIterator) *BufferedSeriesIterator {
return &BufferedSeriesIterator{
it: it,
}
}
func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) {
return b.last.t, b.last.v, true
}
func (b *BufferedSeriesIterator) Seek(t int64) bool {
t0 := t - 20000 // TODO(fabxc): hard-coded 20s lookback, make configurable.
ok := b.it.Seek(t0)
if !ok {
return false
}
b.last.t, b.last.v = b.it.Values()
// TODO(fabxc): skip to relevant chunk.
for b.Next() {
if ts, _ := b.Values(); ts >= t {
return true
}
}
return false
}
func (b *BufferedSeriesIterator) Next() bool {
b.last.t, b.last.v = b.it.Values()
return b.it.Next()
}
func (b *BufferedSeriesIterator) Values() (int64, float64) {
return b.it.Values()
}
func (b *BufferedSeriesIterator) Err() error {
return b.it.Err()
}

View File

@ -54,7 +54,7 @@ type IndexReader interface {
// LabelValues returns the possible label values
LabelValues(names ...string) (StringTuples, error)
// Postings returns the postings list iteartor for the label pair.
// Postings returns the postings list iterator for the label pair.
Postings(name, value string) (Iterator, error)
// Series returns the series for the given reference.
@ -83,6 +83,7 @@ type indexReader struct {
var (
errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
errNotFound = fmt.Errorf("not found")
)
func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) {
@ -299,6 +300,14 @@ func (s *series) Labels() Labels {
func (s *series) Iterator() SeriesIterator {
var cs []chunks.Chunk
for _, co := range s.offsets {
c, err := s.chunk(co.Offset)
if err != nil {
panic(err) // TODO(fabxc): add error series iterator.
}
cs = append(cs, c)
}
return newChunkSeriesIterator(cs)
}