Add queriers and partial mocks
This commit is contained in:
parent
9b400b4c58
commit
6eeb0ef01c
26
block.go
26
block.go
|
@ -1,10 +1,6 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sort"
|
||||
"unsafe"
|
||||
)
|
||||
import "sort"
|
||||
|
||||
const (
|
||||
magicIndex = 0xCAFECAFE
|
||||
|
@ -12,7 +8,9 @@ const (
|
|||
)
|
||||
|
||||
// Block handles reads against a block of time series data within a time window.
|
||||
type Block interface{}
|
||||
type Block interface {
|
||||
Querier(mint, maxt int64) Querier
|
||||
}
|
||||
|
||||
const (
|
||||
flagNone = 0
|
||||
|
@ -22,9 +20,6 @@ const (
|
|||
// A skiplist maps offsets to values. The values found in the data at an
|
||||
// offset are strictly greater than the indexed value.
|
||||
type skiplist interface {
|
||||
// A skiplist can serialize itself into a writer.
|
||||
io.WriterTo
|
||||
|
||||
// offset returns the offset to data containing values of x and lower.
|
||||
offset(x int64) (uint32, bool)
|
||||
}
|
||||
|
@ -48,16 +43,3 @@ func (sl simpleSkiplist) offset(x int64) (uint32, bool) {
|
|||
}
|
||||
return sl[i-1].offset, true
|
||||
}
|
||||
|
||||
func (sl simpleSkiplist) WriteTo(w io.Writer) (n int64, err error) {
|
||||
for _, s := range sl {
|
||||
b := ((*[unsafe.Sizeof(skiplistPair{})]byte)(unsafe.Pointer(&s)))[:]
|
||||
|
||||
m, err := w.Write(b)
|
||||
if err != nil {
|
||||
return n + int64(m), err
|
||||
}
|
||||
n += int64(m)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
|
13
db.go
13
db.go
|
@ -89,11 +89,6 @@ func (db *DB) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Querier returns a new querier over the database.
|
||||
func (db *DB) Querier(start, end int64) Querier {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Appender adds a batch of samples.
|
||||
type Appender interface {
|
||||
// Add adds a sample pair to the appended batch.
|
||||
|
@ -199,7 +194,7 @@ type SeriesShard struct {
|
|||
logger log.Logger
|
||||
|
||||
mtx sync.RWMutex
|
||||
blocks *Block
|
||||
blocks []*Block
|
||||
head *HeadBlock
|
||||
}
|
||||
|
||||
|
@ -257,6 +252,12 @@ func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// blocksForRange returns all blocks within the shard that may contain
|
||||
// data for the given time range.
|
||||
func (s *SeriesShard) blocksForRange(mint, maxt int64) (bs []Block) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(fabxc): make configurable.
|
||||
const shardGracePeriod = 60 * 1000 // 60 seconds for millisecond scale
|
||||
|
||||
|
|
30
index.go
30
index.go
|
@ -83,16 +83,30 @@ func (p *memPostings) add(id uint32, terms ...term) {
|
|||
|
||||
// Iterator provides iterative access over a postings list.
|
||||
type Iterator interface {
|
||||
// Next advances the iterator and returns true if another
|
||||
// value was found.
|
||||
// Next advances the iterator and returns true if another value was found.
|
||||
Next() bool
|
||||
|
||||
// Seek advances the iterator to value v or greater and returns
|
||||
// true if a value was found.
|
||||
Seek(v uint32) bool
|
||||
|
||||
// Value returns the value at the current iterator position.
|
||||
Value() uint32
|
||||
|
||||
// Err returns the last error of the iterator.
|
||||
Err() error
|
||||
}
|
||||
|
||||
// errIterator is an empty iterator that always errors.
|
||||
type errIterator struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (e errIterator) Next() bool { return false }
|
||||
func (e errIterator) Seek(uint32) bool { return false }
|
||||
func (e errIterator) Value() uint32 { return 0 }
|
||||
func (e errIterator) Err() error { return e.err }
|
||||
|
||||
// Intersect returns a new iterator over the intersection of the
|
||||
// input iterators.
|
||||
func Intersect(its ...Iterator) Iterator {
|
||||
|
@ -123,6 +137,10 @@ func (it *intersectIterator) Seek(id uint32) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (it *intersectIterator) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Merge returns a new iterator over the union of the input iterators.
|
||||
func Merge(its ...Iterator) Iterator {
|
||||
if len(its) == 0 {
|
||||
|
@ -152,6 +170,10 @@ func (it *mergeIterator) Seek(id uint32) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (it *mergeIterator) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// listIterator implements the Iterator interface over a plain list.
|
||||
type listIterator struct {
|
||||
list []uint32
|
||||
|
@ -175,6 +197,10 @@ func (it *listIterator) Seek(x uint32) bool {
|
|||
return it.idx < len(it.list)
|
||||
}
|
||||
|
||||
func (it *listIterator) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type stringset map[string]struct{}
|
||||
|
||||
func (ss stringset) set(s string) {
|
||||
|
|
267
querier.go
267
querier.go
|
@ -1,6 +1,10 @@
|
|||
package tsdb
|
||||
|
||||
import "github.com/fabxc/tsdb/chunks"
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/fabxc/tsdb/chunks"
|
||||
)
|
||||
|
||||
// Matcher matches a string.
|
||||
type Matcher interface {
|
||||
|
@ -11,34 +15,260 @@ type Matcher interface {
|
|||
// Querier provides querying access over time series data of a fixed
|
||||
// time range.
|
||||
type Querier interface {
|
||||
// Range returns the timestamp range of the Querier.
|
||||
Range() (start, end int64)
|
||||
|
||||
// Iterator returns an interator over the inverted index that
|
||||
// matches the key label by the constraints of Matcher.
|
||||
Iterator(key string, m Matcher) Iterator
|
||||
|
||||
// Labels resolves a label reference into a set of labels.
|
||||
Labels(ref LabelRefs) (Labels, error)
|
||||
|
||||
// Series returns series provided in the index iterator.
|
||||
Series(Iterator) []Series
|
||||
Series(Iterator) ([]Series, error)
|
||||
|
||||
// LabelValues returns all potential values for a label name.
|
||||
LabelValues(string) []string
|
||||
LabelValues(string) ([]string, error)
|
||||
// LabelValuesFor returns all potential values for a label name.
|
||||
// under the constraint of another label.
|
||||
LabelValuesFor(string, Label) []string
|
||||
LabelValuesFor(string, Label) ([]string, error)
|
||||
|
||||
// Close releases the resources of the Querier.
|
||||
Close() error
|
||||
}
|
||||
|
||||
func example(db *DB) error {
|
||||
var m1, m2, m3, m4 Matcher
|
||||
|
||||
q := db.Querier(0, 1000)
|
||||
|
||||
series, err := q.Series(
|
||||
Merge(
|
||||
Intersect(q.Iterator("name", m1), q.Iterator("job", m2)),
|
||||
Intersect(q.Iterator("name", m3), q.Iterator("job", m4)),
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, s := range series {
|
||||
s.Iterator()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Series represents a single time series.
|
||||
type Series interface {
|
||||
Labels() (Labels, error)
|
||||
// Labels returns the complete set of labels identifying the series.
|
||||
Labels() Labels
|
||||
// Iterator returns a new iterator of the data of the series.
|
||||
Iterator() (SeriesIterator, error)
|
||||
Iterator() SeriesIterator
|
||||
}
|
||||
|
||||
func inRange(x, mint, maxt int64) bool {
|
||||
return x >= mint && x <= maxt
|
||||
}
|
||||
|
||||
// querier merges query results from a set of shard querieres.
|
||||
type querier struct {
|
||||
mint, maxt int64
|
||||
shards []Querier
|
||||
}
|
||||
|
||||
// Querier returns a new querier over the database for the given
|
||||
// time range.
|
||||
func (db *DB) Querier(mint, maxt int64) Querier {
|
||||
q := &querier{
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
}
|
||||
for _, s := range db.shards {
|
||||
q.shards = append(q.shards, s.Querier(mint, maxt))
|
||||
}
|
||||
|
||||
return q
|
||||
}
|
||||
|
||||
// SeriesSet contains a set of series.
|
||||
type SeriesSet interface {
|
||||
}
|
||||
|
||||
func (q *querier) Select(key string, m Matcher) SeriesSet {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *querier) Iterator(key string, m Matcher) Iterator {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *querier) Series(Iterator) ([]Series, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (q *querier) LabelValues(string) ([]string, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (q *querier) LabelValuesFor(string, Label) ([]string, error) {
|
||||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (q *querier) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// shardQuerier aggregates querying results from time blocks within
|
||||
// a single shard.
|
||||
type shardQuerier struct {
|
||||
blocks []Querier
|
||||
}
|
||||
|
||||
// Querier returns a new querier over the data shard for the given
|
||||
// time range.
|
||||
func (s *SeriesShard) Querier(mint, maxt int64) Querier {
|
||||
blocks := s.blocksForRange(mint, maxt)
|
||||
|
||||
sq := &shardQuerier{
|
||||
blocks: make([]Querier, 0, len(blocks)),
|
||||
}
|
||||
for _, b := range blocks {
|
||||
sq.blocks = append(sq.blocks, b.Querier(mint, maxt))
|
||||
}
|
||||
|
||||
return sq
|
||||
}
|
||||
|
||||
func (q *shardQuerier) Iterator(name string, m Matcher) Iterator {
|
||||
// Iterators from different blocks have no time overlap. The reference numbers
|
||||
// they emit point to series sorted in lexicographic order.
|
||||
// If actually retrieving an iterator result via the Series method, we can fully
|
||||
// deduplicate series by simply comparing with the previous label set.
|
||||
var rit Iterator
|
||||
|
||||
for _, s := range q.blocks {
|
||||
rit = Merge(rit, s.Iterator(name, m))
|
||||
}
|
||||
|
||||
return rit
|
||||
}
|
||||
|
||||
func (q *shardQuerier) Series(it Iterator) ([]Series, error) {
|
||||
// Dedulicate series as we stream through the iterator. See comment
|
||||
// on the Iterator method.
|
||||
|
||||
var series []Series
|
||||
// var prev Labels
|
||||
|
||||
// for it.Next() {
|
||||
// s, err := q.index.Series(it.Value())
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// series = append(series, s)
|
||||
// }
|
||||
// if it.Err() != nil {
|
||||
// return nil, it.Err()
|
||||
// }
|
||||
|
||||
return series, nil
|
||||
}
|
||||
|
||||
func (q *shardQuerier) LabelValues(string) ([]string, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (q *shardQuerier) LabelValuesFor(string, Label) ([]string, error) {
|
||||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (q *shardQuerier) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// blockQuerier provides querying access to a single block database.
|
||||
type blockQuerier struct {
|
||||
mint, maxt int64
|
||||
|
||||
index IndexReader
|
||||
series SeriesReader
|
||||
}
|
||||
|
||||
func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQuerier {
|
||||
return &blockQuerier{
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
index: ix,
|
||||
series: s,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *blockQuerier) Iterator(name string, m Matcher) Iterator {
|
||||
tpls, err := q.index.LabelValues(name)
|
||||
if err != nil {
|
||||
return errIterator{err: err}
|
||||
}
|
||||
// TODO(fabxc): use interface upgrading to provide fast solution
|
||||
// for equality and prefix matches. Tuples are lexicographically sorted.
|
||||
var res []string
|
||||
|
||||
for i := 0; i < tpls.Len(); i++ {
|
||||
vals, err := tpls.At(i)
|
||||
if err != nil {
|
||||
return errIterator{err: err}
|
||||
}
|
||||
if m.Match(vals[0]) {
|
||||
res = append(res, vals[0])
|
||||
}
|
||||
}
|
||||
|
||||
var rit Iterator
|
||||
|
||||
for _, v := range res {
|
||||
it, err := q.index.Postings(name, v)
|
||||
if err != nil {
|
||||
return errIterator{err: err}
|
||||
}
|
||||
rit = Intersect(rit, it)
|
||||
}
|
||||
|
||||
return rit
|
||||
}
|
||||
|
||||
func (q *blockQuerier) Series(it Iterator) ([]Series, error) {
|
||||
var series []Series
|
||||
|
||||
for it.Next() {
|
||||
s, err := q.index.Series(it.Value())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
series = append(series, s)
|
||||
}
|
||||
if it.Err() != nil {
|
||||
return nil, it.Err()
|
||||
}
|
||||
|
||||
return series, nil
|
||||
}
|
||||
|
||||
func (q *blockQuerier) LabelValues(name string) ([]string, error) {
|
||||
tpls, err := q.index.LabelValues(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := make([]string, 0, tpls.Len())
|
||||
|
||||
for i := 0; i < tpls.Len(); i++ {
|
||||
vals, err := tpls.At(i)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res = append(res, vals[0])
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (q *blockQuerier) LabelValuesFor(string, Label) ([]string, error) {
|
||||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (q *blockQuerier) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SeriesIterator iterates over the data of a time series.
|
||||
|
@ -118,4 +348,17 @@ type bufferedSeriesIterator struct {
|
|||
// the simpler interface.
|
||||
//
|
||||
// Consider making this the main external interface.
|
||||
SeriesIterator
|
||||
|
||||
buf []sample // lookback buffer
|
||||
i int // current head
|
||||
}
|
||||
|
||||
type sample struct {
|
||||
t int64
|
||||
v float64
|
||||
}
|
||||
|
||||
func (b *bufferedSeriesIterator) PeekBack(i int) (t int64, v float64, ok bool) {
|
||||
return 0, 0, false
|
||||
}
|
||||
|
|
11
reader.go
11
reader.go
|
@ -292,13 +292,14 @@ type series struct {
|
|||
chunk func(ref uint32) (chunks.Chunk, error)
|
||||
}
|
||||
|
||||
func (s *series) Labels() (Labels, error) {
|
||||
return s.labels, nil
|
||||
func (s *series) Labels() Labels {
|
||||
return s.labels
|
||||
}
|
||||
|
||||
func (s *series) Iterator() (SeriesIterator, error) {
|
||||
// dereference skiplist and construct from chunk iterators.
|
||||
return nil, nil
|
||||
func (s *series) Iterator() SeriesIterator {
|
||||
var cs []chunks.Chunk
|
||||
|
||||
return newChunkSeriesIterator(cs)
|
||||
}
|
||||
|
||||
type stringTuples struct {
|
||||
|
|
Loading…
Reference in New Issue