Add new interfaces and skeleton

This commit is contained in:
Fabian Reinartz 2016-12-04 13:16:11 +01:00
parent ff29705571
commit 83574b1565
6 changed files with 576 additions and 19 deletions

196
db.go
View File

@ -2,11 +2,12 @@
package tsdb
import (
"encoding/binary"
"sync"
"fmt"
"os"
"sort"
"time"
"github.com/fabxc/tsdb/chunks"
"github.com/cespare/xxhash"
"github.com/prometheus/common/log"
)
@ -25,39 +26,166 @@ type DB struct {
logger log.Logger
opts *Options
shards map[uint64]*TimeShards
shards []*SeriesShard
}
// TODO(fabxc): make configurable
const (
numSeriesShards = 32
maxChunkSize = 1024
)
// Open or create a new DB.
func Open(path string, l log.Logger, opts *Options) (*DB, error) {
if opts == nil {
opts = DefaultOptions
}
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
c := &DB{
logger: l,
opts: opts,
logger: l,
opts: opts,
}
// Initialize vertical shards.
// TODO(fabxc): validate shard number to be power of 2, which is required
// for the bitshift-modulo when finding the right shard.
for i := 0; i < numSeriesShards; i++ {
c.shards = append(c.shards, NewSeriesShard())
}
// TODO(fabxc): run background compaction + GC.
return c, nil
}
// Close the database.
func (db *DB) Close() error {
return fmt.Errorf("not implemented")
}
// Querier returns a new querier over the database.
func (db *DB) Querier(start, end int64) Querier {
return nil
}
// Matcher matches a string.
type Matcher interface {
// Match returns true if the matcher applies to the string value.
Match(v string) bool
}
// Querier provides querying access over time series data of a fixed
// time range.
type Querier interface {
// Iterator returns an interator over the inverted index that
// matches the key label by the constraints of Matcher.
Iterator(key string, m Matcher) Iterator
// Labels resolves a label reference into a set of labels.
Labels(ref LabelRefs) (Labels, error)
// Series returns series provided in the index iterator.
Series(Iterator) []Series
// Close releases the resources of the Querier.
Close() error
// Range returns the timestamp range of the Querier.
Range() (start, end int64)
}
// Series represents a single time series.
type Series interface {
// LabelsRef returns the label set reference
LabelRefs() LabelRefs
// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator
}
// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at ts, it advances to the last value
// before ts.
Seek(ts int64) bool
// Values returns the current timestamp/value pair.
Values() (int64, float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}
type LabelRefs struct {
block uint64
offsets []uint32
}
// Label is a key/value pair of strings.
type Label struct {
Name, Value string
}
// LabelSet is a sorted set of labels. Order has to be guaranteed upon
// Labels is a sorted set of labels. Order has to be guaranteed upon
// instantiation.
type LabelSet []Label
type Labels []Label
func (ls LabelSet) Len() int { return len(ls) }
func (ls LabelSet) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i]}
func (ls LabelSet) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
func (ls Labels) Len() int { return len(ls) }
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
func (ls Labels) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
// NewLabelSet returns a sorted LabelSet from the given labels.
// Hash returns a hash value for the label set.
func (ls Labels) Hash() uint64 {
b := make([]byte, 0, 512)
for _, v := range ls {
b = append(b, v.Name...)
b = append(b, '\xff')
b = append(b, v.Value...)
b = append(b, '\xff')
}
return xxhash.Sum64(b)
}
// Get returns the value for the label with the given name.
// Returns an empty string if the label doesn't exist.
func (ls Labels) Get(name string) string {
for _, l := range ls {
if l.Name == name {
return l.Value
}
}
return ""
}
// Equals returns whether the two label sets are equal.
func (ls Labels) Equals(o Labels) bool {
if len(ls) != len(o) {
return false
}
for i, l := range ls {
if l.Name != o[i].Name || l.Value != o[i].Value {
return false
}
}
return true
}
// Map returns a string map of the labels.
func (ls Labels) Map() map[string]string {
m := make(map[string]string, len(ls))
for _, l := range ls {
m[l.Name] = l.Value
}
return m
}
// NewLabels returns a sorted Labels from the given labels.
// The caller has to guarantee that all label names are unique.
func NewLabelSet(ls ...Label) LabelSet {
set := make(LabelSet, 0, len(l))
func NewLabels(ls ...Label) Labels {
set := make(Labels, 0, len(ls))
for _, l := range ls {
set = append(set, l)
}
@ -66,11 +194,41 @@ func NewLabelSet(ls ...Label) LabelSet {
return set
}
type Vector struct {
LabelSets []LabelSet
Values []float64
// LabelsFromMap returns new sorted Labels from the given map.
func LabelsFromMap(m map[string]string) Labels {
l := make([]Label, 0, len(m))
for k, v := range m {
l = append(l, Label{Name: k, Value: v})
}
return NewLabels(l...)
}
func (db *DB) AppendVector(v *Vector) error {
// Vector is a set of LabelSet associated with one value each.
// Label sets and values must have equal length.
type Vector struct {
LabelSets []Labels
Values []float64
}
// AppendVector adds values for a list of label sets for the given timestamp
// in milliseconds.
func (db *DB) AppendVector(ts int64, v *Vector) error {
// Sequentially add samples to shards.
for i, ls := range v.LabelSets {
h := ls.Hash()
shard := db.shards[h>>(64-uint(len(db.shards)))]
// TODO(fabxc): benchmark whether grouping into shards and submitting to
// shards in batches is more efficient.
shard.head.mtx.Lock()
if err := shard.head.append(h, ls, ts, v.Values[i]); err != nil {
shard.head.mtx.Unlock()
// TODO(fabxc): handle gracefully and collect multi-error.
return err
}
shard.head.mtx.Unlock()
}
return nil
}
}

62
db_test.go Normal file
View File

@ -0,0 +1,62 @@
package tsdb
import "testing"
func BenchmarkLabelSetFromMap(b *testing.B) {
m := map[string]string{
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
}
var ls Labels
b.ReportAllocs()
for i := 0; i < b.N; i++ {
ls = LabelsFromMap(m)
}
_ = ls
}
func BenchmarkMapFromLabels(b *testing.B) {
m := map[string]string{
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
}
ls := LabelsFromMap(m)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
m = ls.Map()
}
}
func BenchmarkLabelSetEquals(b *testing.B) {
// The vast majority of comparisons will be against a matching label set.
m := map[string]string{
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
}
ls := LabelsFromMap(m)
var res bool
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
res = ls.Equals(ls)
}
_ = res
}

135
index.go Normal file
View File

@ -0,0 +1,135 @@
package tsdb
import "sort"
// Index provides read access to an inverted index.
type Index interface {
Postings(ref uint32) Iterator
}
// memIndex is an inverted in-memory index.
type memIndex struct {
lastID uint32
m map[string][]uint32
}
// Postings returns an iterator over the postings list for s.
func (ix *memIndex) Postings(s string) Iterator {
return &listIterator{list: ix.m[s]}
}
// add adds a document to the index. The caller has to ensure that no
// term argument appears twice.
func (ix *memIndex) add(terms ...string) uint32 {
ix.lastID++
for _, t := range terms {
ix.m[t] = append(ix.m[t], ix.lastID)
}
return ix.lastID
}
// newMemIndex returns a new in-memory index.
func newMemIndex() *memIndex {
return &memIndex{m: make(map[string][]uint32)}
}
// Iterator provides iterative access over a postings list.
type Iterator interface {
// Next advances the iterator and returns true if another
// value was found.
Next() bool
// Seek advances the iterator to value v or greater and returns
// true if a value was found.
Seek(v uint32) bool
// Value returns the value at the current iterator position.
Value() uint32
}
// compressIndex returns a compressed index for the given input index.
func compressIndex(ix Index) {
}
// Intersect returns a new iterator over the intersection of the
// input iterators.
func Intersect(its ...Iterator) Iterator {
if len(its) == 0 {
return nil
}
a := its[0]
for _, b := range its[1:] {
a = &intersectIterator{a: a, b: b}
}
return a
}
type intersectIterator struct {
a, b Iterator
}
func (it *intersectIterator) Value() uint32 {
return 0
}
func (it *intersectIterator) Next() bool {
return false
}
func (it *intersectIterator) Seek(id uint32) bool {
return false
}
// Merge returns a new iterator over the union of the input iterators.
func Merge(its ...Iterator) Iterator {
if len(its) == 0 {
return nil
}
a := its[0]
for _, b := range its[1:] {
a = &mergeIterator{a: a, b: b}
}
return a
}
type mergeIterator struct {
a, b Iterator
}
func (it *mergeIterator) Value() uint32 {
return 0
}
func (it *mergeIterator) Next() bool {
return false
}
func (it *mergeIterator) Seek(id uint32) bool {
return false
}
// listIterator implements the Iterator interface over a plain list.
type listIterator struct {
list []uint32
idx int
}
func (it *listIterator) Value() uint32 {
return it.list[it.idx]
}
func (it *listIterator) Next() bool {
it.idx++
return it.idx < len(it.list)
}
func (it *listIterator) Seek(x uint32) bool {
// Do binary search between current position and end.
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool {
return it.list[i+it.idx] >= x
})
return it.idx < len(it.list)
}

126
shard.go
View File

@ -0,0 +1,126 @@
package tsdb
import (
"fmt"
"io"
"math"
"sync"
"github.com/fabxc/tsdb/chunks"
)
const sep = '\xff'
// SeriesShard handles reads and writes of time series falling into
// a hashed shard of a series.
type SeriesShard struct {
mtx sync.RWMutex
blocks *Block
head *HeadBlock
}
// NewSeriesShard returns a new SeriesShard.
func NewSeriesShard() *SeriesShard {
return &SeriesShard{
// TODO(fabxc): restore from checkpoint.
head: &HeadBlock{
index: newMemIndex(),
descs: map[uint64][]*chunkDesc{},
values: map[string][]string{},
forward: map[uint32]*chunkDesc{},
},
// TODO(fabxc): provide access to persisted blocks.
}
}
// HeadBlock handles reads and writes of time series data within a time window.
type HeadBlock struct {
mtx sync.RWMutex
descs map[uint64][]*chunkDesc // labels hash to possible chunks descs
forward map[uint32]*chunkDesc // chunk ID to chunk desc
values map[string][]string // label names to possible values
index *memIndex // inverted index for label pairs
}
// Block handles reads against a completed block of time series data within a time window.
type Block struct {
}
// WriteTo serializes the current head block contents into w.
func (h *HeadBlock) WriteTo(w io.Writer) (int64, error) {
h.mtx.RLock()
defer h.mtx.RUnlock()
return 0, fmt.Errorf("not implemented")
}
// get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet.
func (h *HeadBlock) get(hash uint64, lset Labels) (*chunkDesc, bool) {
cds := h.descs[hash]
for _, cd := range cds {
if cd.lset.Equals(lset) {
return cd, false
}
}
// None of the given chunks was for the series, create a new one.
cd := &chunkDesc{
lset: lset,
chunk: chunks.NewXORChunk(int(math.MaxInt64)),
}
h.descs[hash] = append(cds, cd)
return cd, true
}
// append adds the sample to the headblock. If the series is seen
// for the first time it creates a chunk and index entries for it.
//
// TODO(fabxc): switch to single writer and append queue with optimistic concurrency?
func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error {
chkd, created := h.get(hash, lset)
if created {
// Add each label pair as a term to the inverted index.
terms := make([]string, 0, len(lset))
b := make([]byte, 0, 64)
for _, l := range lset {
b = append(b, l.Name...)
b = append(b, sep)
b = append(b, l.Value...)
terms = append(terms, string(b))
b = b[:0]
}
id := h.index.add(terms...)
// Store forward index for the returned ID.
h.forward[id] = chkd
}
return chkd.append(ts, v)
}
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct {
lset Labels
chunk chunks.Chunk
// Caching fields.
lastTimestamp int64
lastValue float64
app chunks.Appender // Current appender for the chunks.
}
func (cd *chunkDesc) append(ts int64, v float64) (err error) {
if cd.app == nil {
cd.app, err = cd.chunk.Appender()
if err != nil {
return err
}
}
cd.lastTimestamp = ts
cd.lastValue = v
return cd.app.Append(ts, v)
}

26
test/conv_test.go Normal file
View File

@ -0,0 +1,26 @@
package test
import "testing"
func BenchmarkMapConversion(b *testing.B) {
type key string
type val string
m := map[key]val{
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
}
var sm map[string]string
for i := 0; i < b.N; i++ {
sm = make(map[string]string, len(m))
for k, v := range m {
sm[string(k)] = string(v)
}
}
}

50
test/labels_test.go Normal file
View File

@ -0,0 +1,50 @@
package test
import (
"testing"
"github.com/fabxc/tsdb"
)
func BenchmarkLabelMapAccess(b *testing.B) {
m := map[string]string{
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
}
var v string
for k := range m {
b.Run(k, func(b *testing.B) {
for i := 0; i < b.N; i++ {
v = m[k]
}
})
}
}
func BenchmarkLabelSetAccess(b *testing.B) {
m := map[string]string{
"job": "node",
"instance": "123.123.1.211:9090",
"path": "/api/v1/namespaces/<namespace>/deployments/<name>",
"method": "GET",
"namespace": "system",
"status": "500",
}
ls := tsdb.LabelsFromMap(m)
var v string
for _, l := range ls {
b.Run(l.Name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
v = ls.Get(l.Name)
}
})
}
}