prometheus/db.go

540 lines
11 KiB
Go
Raw Normal View History

2016-11-15 09:34:25 +00:00
// Package tsdb implements a time series storage for float64 sample data.
package tsdb
import (
"bytes"
2016-12-04 12:16:11 +00:00
"fmt"
"os"
"path/filepath"
"reflect"
2016-12-04 12:16:11 +00:00
"sort"
2016-12-15 07:31:26 +00:00
"strconv"
"sync"
2016-11-15 09:34:25 +00:00
"time"
"unsafe"
2016-11-15 09:34:25 +00:00
2016-12-15 07:31:26 +00:00
"golang.org/x/sync/errgroup"
2016-12-04 12:16:11 +00:00
"github.com/cespare/xxhash"
"github.com/fabxc/tsdb/chunks"
"github.com/go-kit/kit/log"
2016-11-15 09:34:25 +00:00
)
2016-12-09 09:00:14 +00:00
// DefaultOptions used for the DB. They are sane for setups using
// millisecond precision timestamps.
2016-11-15 09:34:25 +00:00
var DefaultOptions = &Options{
2016-12-09 09:00:14 +00:00
Retention: 15 * 24 * 3600 * 1000, // 15 days
2016-11-15 09:34:25 +00:00
}
// Options of the DB storage.
type Options struct {
2016-12-09 09:00:14 +00:00
Retention int64
2016-11-15 09:34:25 +00:00
}
// DB is a time series storage.
type DB struct {
logger log.Logger
opts *Options
path string
2016-11-15 09:34:25 +00:00
2016-12-15 07:36:09 +00:00
shards []*Shard
2016-11-15 09:34:25 +00:00
}
2016-12-04 12:16:11 +00:00
// TODO(fabxc): make configurable
const (
shardShift = 0
2016-12-15 07:36:09 +00:00
numShards = 1 << shardShift
maxChunkSize = 1024
2016-12-04 12:16:11 +00:00
)
2016-11-15 09:34:25 +00:00
// Open or create a new DB.
func Open(path string, l log.Logger, opts *Options) (*DB, error) {
if opts == nil {
opts = DefaultOptions
}
2016-12-04 12:16:11 +00:00
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
if l == nil {
l = log.NewLogfmtLogger(os.Stdout)
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
}
2016-11-15 09:34:25 +00:00
c := &DB{
2016-12-04 12:16:11 +00:00
logger: l,
opts: opts,
path: path,
2016-11-15 09:34:25 +00:00
}
2016-12-04 12:16:11 +00:00
// Initialize vertical shards.
// TODO(fabxc): validate shard number to be power of 2, which is required
// for the bitshift-modulo when finding the right shard.
2016-12-15 07:36:09 +00:00
for i := 0; i < numShards; i++ {
l := log.NewContext(l).With("shard", i)
2016-12-15 07:31:26 +00:00
d := shardDir(path, i)
2016-12-15 07:36:09 +00:00
s, err := OpenShard(d, l)
2016-12-15 07:31:26 +00:00
if err != nil {
return nil, fmt.Errorf("initializing shard %q failed: %s", d, err)
}
c.shards = append(c.shards, s)
2016-12-04 12:16:11 +00:00
}
// TODO(fabxc): run background compaction + GC.
2016-11-15 09:34:25 +00:00
return c, nil
}
2016-12-15 07:31:26 +00:00
func shardDir(base string, i int) string {
return filepath.Join(base, strconv.Itoa(i))
}
2016-12-04 12:16:11 +00:00
// Close the database.
func (db *DB) Close() error {
2016-12-15 07:31:26 +00:00
var g errgroup.Group
2016-12-09 09:00:14 +00:00
2016-12-15 07:31:26 +00:00
for _, shard := range db.shards {
// Fix closure argument to goroutine.
shard := shard
g.Go(shard.Close)
}
2016-12-09 09:00:14 +00:00
2016-12-15 07:31:26 +00:00
return g.Wait()
2016-12-04 12:16:11 +00:00
}
// Appender adds a batch of samples.
type Appender interface {
// Add adds a sample pair to the appended batch.
Add(l Labels, t int64, v float64)
// Commit submits the collected samples.
Commit() error
}
// Vector is a set of LabelSet associated with one value each.
// Label sets and values must have equal length.
type Vector struct {
Buckets map[uint16][]Sample
reused int
}
type Sample struct {
Hash uint64
Labels Labels
Value float64
}
// Reset the vector but keep resources allocated.
func (v *Vector) Reset() {
// Do a full reset every n-th reusage to avoid memory leaks.
if v.Buckets == nil || v.reused > 100 {
v.Buckets = make(map[uint16][]Sample, 0)
return
}
for x, bkt := range v.Buckets {
v.Buckets[x] = bkt[:0]
}
v.reused++
}
// Add a sample to the vector.
func (v *Vector) Add(lset Labels, val float64) {
h := lset.Hash()
2016-12-15 07:36:09 +00:00
s := uint16(h >> (64 - shardShift))
v.Buckets[s] = append(v.Buckets[s], Sample{
Hash: h,
Labels: lset,
Value: val,
})
}
// func (db *DB) Appender() Appender {
// return &bucketAppender{
// samples: make([]Sample, 1024),
// }
// }
// type bucketAppender struct {
// db *DB
// // buckets []Sam
// }
// func (a *bucketAppender) Add(l Labels, t int64, v float64) {
// }
// func (a *bucketAppender) Commit() error {
// // f
// }
2016-12-09 12:41:38 +00:00
// AppendVector adds values for a list of label sets for the given timestamp
// in milliseconds.
func (db *DB) AppendVector(ts int64, v *Vector) error {
// Sequentially add samples to shards.
for s, bkt := range v.Buckets {
shard := db.shards[s]
if err := shard.appendBatch(ts, bkt); err != nil {
// TODO(fabxc): handle gracefully and collect multi-error.
return err
}
}
return nil
}
func (db *DB) AppendSingle(lset Labels, ts int64, v float64) error {
sort.Sort(lset)
2016-12-09 15:54:38 +00:00
h := lset.Hash()
2016-12-15 07:36:09 +00:00
s := uint16(h >> (64 - shardShift))
2016-12-09 15:54:38 +00:00
2016-12-09 19:45:46 +00:00
return db.shards[s].appendBatch(ts, []Sample{
{
Hash: h,
Labels: lset,
Value: v,
},
2016-12-09 15:54:38 +00:00
})
}
2016-12-09 09:00:14 +00:00
const sep = '\xff'
2016-12-15 07:36:09 +00:00
// Shard handles reads and writes of time series falling into
2016-12-09 09:00:14 +00:00
// a hashed shard of a series.
2016-12-15 07:36:09 +00:00
type Shard struct {
2016-12-09 12:41:38 +00:00
path string
persistCh chan struct{}
logger log.Logger
2016-12-09 09:00:14 +00:00
2016-12-15 07:31:26 +00:00
mtx sync.RWMutex
persisted persistedBlocks
head *HeadBlock
2016-12-09 09:00:14 +00:00
}
2016-12-15 07:36:09 +00:00
// OpenShard returns a new Shard.
func OpenShard(path string, logger log.Logger) (*Shard, error) {
2016-12-15 07:31:26 +00:00
// Create directory if shard is new.
if _, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
}
// Initialize previously persisted blocks.
pbs, err := findPersistedBlocks(path)
if err != nil {
return nil, err
}
2016-12-15 07:36:09 +00:00
s := &Shard{
2016-12-09 12:41:38 +00:00
path: path,
persistCh: make(chan struct{}, 1),
logger: logger,
2016-12-15 07:31:26 +00:00
persisted: pbs,
2016-12-09 09:00:14 +00:00
// TODO(fabxc): restore from checkpoint.
}
2016-12-09 12:41:38 +00:00
// TODO(fabxc): get base time from pre-existing blocks. Otherwise
// it should come from a user defined start timestamp.
// Use actual time for now.
s.head = NewHeadBlock(time.Now().UnixNano() / int64(time.Millisecond))
2016-12-15 07:31:26 +00:00
return s, nil
2016-12-09 12:41:38 +00:00
}
2016-12-15 07:36:09 +00:00
// Close the shard.
func (s *Shard) Close() error {
2016-12-15 07:31:26 +00:00
var e MultiError
for _, pb := range s.persisted {
e.Add(pb.Close())
}
return e.Err()
2016-12-09 09:00:14 +00:00
}
2016-12-15 07:36:09 +00:00
func (s *Shard) appendBatch(ts int64, samples []Sample) error {
2016-12-09 12:41:38 +00:00
// TODO(fabxc): make configurable.
const persistenceTimeThreshold = 1000 * 60 * 60 // 1 hour if timestamp in ms
s.mtx.Lock()
defer s.mtx.Unlock()
for _, smpl := range samples {
if err := s.head.append(smpl.Hash, smpl.Labels, ts, smpl.Value); err != nil {
// TODO(fabxc): handle gracefully and collect multi-error.
return err
}
}
if ts > s.head.stats.MaxTime {
s.head.stats.MaxTime = ts
2016-12-09 12:41:38 +00:00
}
// TODO(fabxc): randomize over time
if s.head.stats.SampleCount/uint64(s.head.stats.ChunkCount) > 400 {
2016-12-09 12:41:38 +00:00
select {
case s.persistCh <- struct{}{}:
go func() {
if err := s.persist(); err != nil {
s.logger.Log("msg", "persistance error", "err", err)
}
}()
2016-12-09 12:41:38 +00:00
default:
}
}
return nil
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
if bmin >= amin && bmin <= amax {
return true
}
if amin >= bmin && amin <= bmax {
return true
}
return false
}
func intervalContains(min, max, t int64) bool {
return t >= min && t <= max
}
2016-12-13 14:26:58 +00:00
// blocksForRange returns all blocks within the shard that may contain
// data for the given time range.
func (s *Shard) blocksForInterval(mint, maxt int64) []block {
var bs []block
for _, b := range s.persisted {
bmin, bmax := b.interval()
if intervalOverlap(mint, maxt, bmin, bmax) {
bs = append(bs, b)
}
}
hmin, hmax := s.head.interval()
if intervalOverlap(mint, maxt, hmin, hmax) {
bs = append(bs, s.head)
}
fmt.Println("blocks for interval", bs)
return bs
2016-12-13 14:26:58 +00:00
}
2016-12-09 12:41:38 +00:00
// TODO(fabxc): make configurable.
const shardGracePeriod = 60 * 1000 // 60 seconds for millisecond scale
2016-12-15 07:36:09 +00:00
func (s *Shard) persist() error {
2016-12-09 12:41:38 +00:00
s.mtx.Lock()
// Set new head block.
head := s.head
s.head = NewHeadBlock(head.stats.MaxTime)
2016-12-09 12:41:38 +00:00
s.mtx.Unlock()
// Only allow another persistence to be triggered after the current one
// has completed (successful or not.)
2016-12-09 12:41:38 +00:00
defer func() {
<-s.persistCh
}()
// TODO(fabxc): add grace period where we can still append to old head shard
// before actually persisting it.
p := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime))
2016-12-09 12:41:38 +00:00
if err := os.MkdirAll(p, 0777); err != nil {
return err
}
2016-12-19 21:37:03 +00:00
n, err := head.persist(p)
2016-12-09 20:23:34 +00:00
if err != nil {
return err
}
2016-12-19 21:37:03 +00:00
sz := fmt.Sprintf("%.2fMiB", float64(n)/1024/1024)
s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
2016-12-09 12:41:38 +00:00
// Reopen block as persisted block for querying.
pb, err := newPersistedBlock(p)
if err != nil {
return err
}
s.mtx.Lock()
s.persisted = append(s.persisted, pb)
s.mtx.Unlock()
2016-12-09 12:41:38 +00:00
return nil
}
2016-12-09 09:00:14 +00:00
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct {
lset Labels
chunk chunks.Chunk
// Caching fields.
firsTimestamp int64
2016-12-09 09:00:14 +00:00
lastTimestamp int64
lastValue float64
app chunks.Appender // Current appender for the chunks.
}
func (cd *chunkDesc) append(ts int64, v float64) (err error) {
if cd.app == nil {
cd.app, err = cd.chunk.Appender()
if err != nil {
return err
}
cd.firsTimestamp = ts
2016-12-09 09:00:14 +00:00
}
if err := cd.app.Append(ts, v); err != nil {
return err
}
cd.lastTimestamp = ts
cd.lastValue = v
return nil
}
2016-12-04 12:16:11 +00:00
// Label is a key/value pair of strings.
2016-12-02 16:49:05 +00:00
type Label struct {
Name, Value string
2016-11-15 09:34:25 +00:00
}
2016-12-04 12:16:11 +00:00
// Labels is a sorted set of labels. Order has to be guaranteed upon
2016-12-02 16:49:05 +00:00
// instantiation.
2016-12-04 12:16:11 +00:00
type Labels []Label
func (ls Labels) Len() int { return len(ls) }
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
func (ls Labels) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
// Hash returns a hash value for the label set.
func (ls Labels) Hash() uint64 {
2016-12-08 09:04:24 +00:00
b := make([]byte, 0, 1024)
2016-12-04 12:16:11 +00:00
for _, v := range ls {
b = append(b, v.Name...)
2016-12-05 20:25:20 +00:00
b = append(b, sep)
2016-12-04 12:16:11 +00:00
b = append(b, v.Value...)
2016-12-05 20:25:20 +00:00
b = append(b, sep)
2016-12-04 12:16:11 +00:00
}
return xxhash.Sum64(b)
}
// Get returns the value for the label with the given name.
// Returns an empty string if the label doesn't exist.
func (ls Labels) Get(name string) string {
for _, l := range ls {
if l.Name == name {
return l.Value
}
}
return ""
}
// Equals returns whether the two label sets are equal.
func (ls Labels) Equals(o Labels) bool {
if len(ls) != len(o) {
return false
}
for i, l := range ls {
if l.Name != o[i].Name || l.Value != o[i].Value {
return false
}
}
return true
}
2016-11-15 09:34:25 +00:00
2016-12-04 12:16:11 +00:00
// Map returns a string map of the labels.
func (ls Labels) Map() map[string]string {
m := make(map[string]string, len(ls))
for _, l := range ls {
m[l.Name] = l.Value
}
return m
}
2016-11-15 09:34:25 +00:00
2016-12-04 12:16:11 +00:00
// NewLabels returns a sorted Labels from the given labels.
2016-12-02 16:49:05 +00:00
// The caller has to guarantee that all label names are unique.
2016-12-04 12:16:11 +00:00
func NewLabels(ls ...Label) Labels {
set := make(Labels, 0, len(ls))
2016-12-02 16:49:05 +00:00
for _, l := range ls {
set = append(set, l)
2016-11-15 09:34:25 +00:00
}
2016-12-02 16:49:05 +00:00
sort.Sort(set)
2016-11-15 09:34:25 +00:00
2016-12-02 16:49:05 +00:00
return set
2016-11-15 09:34:25 +00:00
}
2016-12-04 12:16:11 +00:00
// LabelsFromMap returns new sorted Labels from the given map.
func LabelsFromMap(m map[string]string) Labels {
l := make([]Label, 0, len(m))
for k, v := range m {
l = append(l, Label{Name: k, Value: v})
}
return NewLabels(l...)
}
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
// Returns a concatenated string of the contained errors
func (es MultiError) Error() string {
var buf bytes.Buffer
if len(es) > 0 {
fmt.Fprintf(&buf, "%d errors: ", len(es))
2016-12-08 09:04:24 +00:00
}
for i, err := range es {
if i != 0 {
buf.WriteString("; ")
}
buf.WriteString(err.Error())
}
return buf.String()
2016-11-15 09:34:25 +00:00
}
2016-12-15 07:31:26 +00:00
// Add adds the error to the error list if it is not nil.
2016-12-15 07:31:26 +00:00
func (es MultiError) Add(err error) {
if err != nil {
es = append(es, err)
}
}
// Err returns the error list as an error or nil if it is empty.
2016-12-15 07:31:26 +00:00
func (es MultiError) Err() error {
if len(es) == 0 {
return nil
}
return es
}
func yoloString(b []byte) string {
h := reflect.StringHeader{
Data: uintptr(unsafe.Pointer(&b[0])),
Len: len(b),
}
return *((*string)(unsafe.Pointer(&h)))
}
func yoloBytes(s string) []byte {
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
h := reflect.SliceHeader{
Cap: sh.Len,
Len: sh.Len,
Data: sh.Data,
}
return *((*[]byte)(unsafe.Pointer(&h)))
}