prometheus/db.go

609 lines
13 KiB
Go
Raw Normal View History

2016-11-15 09:34:25 +00:00
// Package tsdb implements a time series storage for float64 sample data.
package tsdb
import (
"bytes"
2016-12-04 12:16:11 +00:00
"fmt"
"io/ioutil"
"math"
2016-12-04 12:16:11 +00:00
"os"
"path/filepath"
"reflect"
2016-12-15 07:31:26 +00:00
"strconv"
"sync"
"unsafe"
2016-11-15 09:34:25 +00:00
2016-12-15 07:31:26 +00:00
"golang.org/x/sync/errgroup"
2017-01-03 14:43:26 +00:00
"github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/chunks"
2016-12-21 08:39:01 +00:00
"github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log"
2017-01-03 14:43:26 +00:00
"github.com/pkg/errors"
2016-12-31 08:48:49 +00:00
"github.com/prometheus/client_golang/prometheus"
2016-11-15 09:34:25 +00:00
)
2016-12-09 09:00:14 +00:00
// DefaultOptions used for the DB. They are sane for setups using
// millisecond precision timestampdb.
2016-11-15 09:34:25 +00:00
var DefaultOptions = &Options{
2016-12-26 15:55:44 +00:00
Retention: 15 * 24 * 3600 * 1000, // 15 days
DisableWAL: false,
2016-11-15 09:34:25 +00:00
}
// Options of the DB storage.
type Options struct {
2016-12-26 15:55:44 +00:00
Retention int64
DisableWAL bool
2016-11-15 09:34:25 +00:00
}
// Appender allows committing batches of samples to a database.
// The data held by the appender is reset after Commit returndb.
type Appender interface {
// AddSeries registers a new known series label set with the appender
// and returns a reference number used to add samples to it over the
// life time of the Appender.
// AddSeries(Labels) uint64
// Add adds a sample pair for the referenced seriedb.
2016-12-29 10:03:39 +00:00
Add(lset labels.Labels, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error
}
type hashedSample struct {
hash uint64
2016-12-21 08:39:01 +00:00
labels labels.Labels
2016-12-22 11:05:24 +00:00
ref uint32
t int64
v float64
2016-12-09 15:54:38 +00:00
}
2016-12-09 09:00:14 +00:00
const sep = '\xff'
// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
dir string
2017-01-02 21:24:35 +00:00
logger log.Logger
metrics *dbMetrics
2016-12-09 09:00:14 +00:00
2016-12-15 07:31:26 +00:00
mtx sync.RWMutex
2017-01-03 14:43:26 +00:00
persisted []*persistedBlock
heads []*HeadBlock
compactor *compactor
2016-12-09 09:00:14 +00:00
}
type dbMetrics struct {
2016-12-31 08:48:49 +00:00
persistences prometheus.Counter
persistenceDuration prometheus.Histogram
samplesAppended prometheus.Counter
}
func newDBMetrics(r prometheus.Registerer) *dbMetrics {
m := &dbMetrics{}
2017-01-03 14:43:26 +00:00
m.persistences = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_persistences_total",
Help: "Total number of head persistances that ran so far.",
2017-01-03 14:43:26 +00:00
})
m.persistenceDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_persistence_duration_seconds",
Help: "Duration of persistences in seconddb.",
Buckets: prometheus.ExponentialBuckets(0.25, 2, 5),
2017-01-03 14:43:26 +00:00
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_samples_appended_total",
Help: "Total number of appended sampledb.",
2017-01-03 14:43:26 +00:00
})
2016-12-31 08:48:49 +00:00
if r != nil {
r.MustRegister(
m.persistences,
m.persistenceDuration,
m.samplesAppended,
)
}
return m
}
// Open returns a new DB in the given directory.
func Open(dir string, logger log.Logger) (p *DB, err error) {
2017-01-06 07:08:02 +00:00
// Create directory if partition is new.
if !fileutil.Exist(dir) {
if err := os.MkdirAll(dir, 0777); err != nil {
2016-12-15 07:31:26 +00:00
return nil, err
}
}
p = &DB{
dir: dir,
logger: logger,
metrics: newDBMetrics(nil),
}
if err := p.initBlocks(); err != nil {
2016-12-15 07:31:26 +00:00
return nil, err
}
if p.compactor, err = newCompactor(p, logger); err != nil {
return nil, err
}
return p, nil
}
2016-12-15 07:31:26 +00:00
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
2016-12-22 11:05:24 +00:00
}
if _, err := strconv.ParseUint(fi.Name(), 10, 32); err != nil {
return false
2016-12-22 11:05:24 +00:00
}
return true
}
func (db *DB) initBlocks() error {
var (
pbs []*persistedBlock
heads []*HeadBlock
)
2016-12-22 11:05:24 +00:00
files, err := ioutil.ReadDir(db.dir)
if err != nil {
return err
2016-12-09 09:00:14 +00:00
}
for _, fi := range files {
if !isBlockDir(fi) {
continue
}
dir := filepath.Join(db.dir, fi.Name())
if fileutil.Exist(filepath.Join(dir, walFileName)) {
h, err := OpenHeadBlock(dir)
if err != nil {
return err
}
heads = append(heads, h)
continue
}
b, err := newPersistedBlock(dir)
if err != nil {
return err
}
pbs = append(pbs, b)
}
// Validate that blocks are sequential in time.
lastTime := int64(math.MinInt64)
2016-12-09 12:41:38 +00:00
for _, b := range pbs {
if b.stats().MinTime < lastTime {
return errors.Errorf("illegal order for block at %q", b.dir())
}
lastTime = b.stats().MaxTime
}
for _, b := range heads {
if b.stats().MinTime < lastTime {
return errors.Errorf("illegal order for block at %q", b.dir())
}
lastTime = b.stats().MaxTime
}
2017-01-03 14:43:26 +00:00
db.persisted = pbs
db.heads = heads
2017-01-03 14:43:26 +00:00
if len(heads) == 0 {
return db.cut()
2017-01-02 21:24:35 +00:00
}
return nil
2017-01-02 21:24:35 +00:00
}
2017-01-06 07:08:02 +00:00
// Close the partition.
func (db *DB) Close() error {
2017-01-02 21:24:35 +00:00
var merr MultiError
merr.Add(db.compactor.Close())
2016-12-15 07:31:26 +00:00
db.mtx.Lock()
defer db.mtx.Unlock()
for _, pb := range db.persisted {
2017-01-02 21:24:35 +00:00
merr.Add(pb.Close())
2016-12-15 07:31:26 +00:00
}
for _, hb := range db.heads {
2017-01-03 14:43:26 +00:00
merr.Add(hb.Close())
}
2016-12-15 07:31:26 +00:00
2017-01-02 21:24:35 +00:00
return merr.Err()
2016-12-09 09:00:14 +00:00
}
func (db *DB) appendBatch(samples []hashedSample) error {
if len(samples) == 0 {
return nil
}
db.mtx.Lock()
defer db.mtx.Unlock()
2016-12-09 12:41:38 +00:00
head := db.heads[len(db.heads)-1]
2017-01-03 14:43:26 +00:00
2016-12-22 00:12:28 +00:00
// TODO(fabxc): distinguish samples between concurrent heads for
// different time blocks. Those may occurr during transition to still
// allow late samples to arrive for a previous block.
2017-01-03 14:43:26 +00:00
err := head.appendBatch(samples)
if err == nil {
db.metrics.samplesAppended.Add(float64(len(samples)))
}
2016-12-09 12:41:38 +00:00
2016-12-22 00:12:28 +00:00
// TODO(fabxc): randomize over time and use better scoring function.
if head.bstats.SampleCount/(uint64(head.bstats.ChunkCount)+1) > 250 {
if err := db.cut(); err != nil {
db.logger.Log("msg", "cut failed", "err", err)
} else {
db.compactor.trigger()
2016-12-09 12:41:38 +00:00
}
}
2016-12-22 00:12:28 +00:00
return err
2016-12-09 12:41:38 +00:00
}
func (db *DB) lock() sync.Locker {
return &db.mtx
2017-01-03 14:43:26 +00:00
}
func (db *DB) headForDir(dir string) (int, bool) {
for i, b := range db.heads {
2017-01-03 14:43:26 +00:00
if b.dir() == dir {
return i, true
}
}
return -1, false
}
func (db *DB) persistedForDir(dir string) (int, bool) {
for i, b := range db.persisted {
2017-01-03 14:43:26 +00:00
if b.dir() == dir {
return i, true
}
}
return -1, false
}
func (db *DB) reinit(dir string) error {
2017-01-03 14:43:26 +00:00
if !fileutil.Exist(dir) {
if i, ok := db.headForDir(dir); ok {
if err := db.heads[i].Close(); err != nil {
2017-01-03 14:43:26 +00:00
return err
}
db.heads = append(db.heads[:i], db.heads[i+1:]...)
2017-01-03 14:43:26 +00:00
}
if i, ok := db.persistedForDir(dir); ok {
if err := db.persisted[i].Close(); err != nil {
2017-01-03 14:43:26 +00:00
return err
}
db.persisted = append(db.persisted[:i], db.persisted[i+1:]...)
2017-01-03 14:43:26 +00:00
}
return nil
}
// Remove a previous head block.
if i, ok := db.headForDir(dir); ok {
if err := db.heads[i].Close(); err != nil {
2017-01-03 14:43:26 +00:00
return err
}
db.heads = append(db.heads[:i], db.heads[i+1:]...)
2017-01-03 14:43:26 +00:00
}
// Close an old persisted block.
i, ok := db.persistedForDir(dir)
2017-01-03 14:43:26 +00:00
if ok {
if err := db.persisted[i].Close(); err != nil {
2017-01-03 14:43:26 +00:00
return err
}
}
pb, err := newPersistedBlock(dir)
if err != nil {
return errors.Wrap(err, "open persisted block")
}
if i >= 0 {
db.persisted[i] = pb
2017-01-03 14:43:26 +00:00
} else {
db.persisted = append(db.persisted, pb)
2017-01-03 14:43:26 +00:00
}
return nil
}
func (db *DB) compactable() []block {
2017-01-03 14:43:26 +00:00
var blocks []block
for _, pb := range db.persisted {
blocks = append([]block{pb}, blocks...)
2017-01-03 14:43:26 +00:00
}
// threshold := db.heads[len(db.heads)-1].bstatdb.MaxTime - headGracePeriod
2017-01-03 14:43:26 +00:00
// for _, hb := range db.heads {
// if hb.bstatdb.MaxTime < threshold {
2017-01-03 14:43:26 +00:00
// blocks = append(blocks, hb)
// }
// }
for _, hb := range db.heads[:len(db.heads)-1] {
blocks = append([]block{hb}, blocks...)
2017-01-03 14:43:26 +00:00
}
return blocks
}
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
if bmin >= amin && bmin <= amax {
return true
}
if amin >= bmin && amin <= bmax {
return true
}
return false
}
func intervalContains(min, max, t int64) bool {
return t >= min && t <= max
}
// blocksForInterval returns all blocks within the partition that may contain
2016-12-13 14:26:58 +00:00
// data for the given time range.
func (db *DB) blocksForInterval(mint, maxt int64) []block {
var bs []block
for _, b := range db.persisted {
bmin, bmax := b.interval()
if intervalOverlap(mint, maxt, bmin, bmax) {
bs = append(bs, b)
}
}
for _, b := range db.heads {
2017-01-03 14:43:26 +00:00
bmin, bmax := b.interval()
2017-01-03 14:43:26 +00:00
if intervalOverlap(mint, maxt, bmin, bmax) {
bs = append(bs, b)
}
}
return bs
2016-12-13 14:26:58 +00:00
}
2016-12-09 12:41:38 +00:00
// TODO(fabxc): make configurable.
2017-01-03 14:43:26 +00:00
const headGracePeriod = 60 * 1000 // 60 seconds for millisecond scale
2016-12-09 12:41:38 +00:00
2017-01-03 14:43:26 +00:00
// cut starts a new head block to append to. The completed head block
// will still be appendable for the configured grace period.
func (p *DB) cut() error {
dir, err := p.nextBlockDir()
2016-12-22 11:05:24 +00:00
if err != nil {
return err
}
newHead, err := OpenHeadBlock(dir)
if err != nil {
return err
}
p.heads = append(p.heads, newHead)
2016-12-09 12:41:38 +00:00
2017-01-03 14:43:26 +00:00
return nil
}
2016-12-09 12:41:38 +00:00
func (p *DB) nextBlockDir() (string, error) {
names, err := fileutil.ReadDir(p.dir)
if err != nil {
return "", err
}
i := uint64(0)
if len(names) > 0 {
if i, err = strconv.ParseUint(names[len(names)-1], 10, 32); err != nil {
return "", err
}
}
return filepath.Join(p.dir, fmt.Sprintf("%0.6d", i+1)), nil
}
2016-12-09 12:41:38 +00:00
2016-12-09 09:00:14 +00:00
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct {
ref uint32
2016-12-21 08:39:01 +00:00
lset labels.Labels
2016-12-09 09:00:14 +00:00
chunk chunks.Chunk
// Caching fielddb.
firstTimestamp int64
lastTimestamp int64
lastValue float64
numSamples int
2016-12-09 09:00:14 +00:00
app chunks.Appender // Current appender for the chunkdb.
2016-12-09 09:00:14 +00:00
}
2016-12-31 09:10:27 +00:00
func (cd *chunkDesc) append(ts int64, v float64) {
if cd.numSamples == 0 {
cd.firstTimestamp = ts
2016-12-09 09:00:14 +00:00
}
2016-12-31 09:10:27 +00:00
cd.app.Append(ts, v)
2016-12-09 09:00:14 +00:00
cd.lastTimestamp = ts
cd.lastValue = v
2016-12-22 19:57:00 +00:00
cd.numSamples++
2016-12-09 09:00:14 +00:00
}
// PartitionedDB is a time series storage.
type PartitionedDB struct {
logger log.Logger
opts *Options
dir string
partitionPow uint
Partitions []*DB
}
func isPowTwo(x int) bool {
return x > 0 && (x&(x-1)) == 0
}
// OpenPartitioned or create a new DB.
func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*PartitionedDB, error) {
if !isPowTwo(n) {
return nil, errors.Errorf("%d is not a power of two", n)
}
if opts == nil {
opts = DefaultOptions
}
if l == nil {
l = log.NewLogfmtLogger(os.Stdout)
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
}
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
c := &PartitionedDB{
logger: l,
opts: opts,
dir: dir,
partitionPow: uint(math.Log2(float64(n))),
}
// Initialize vertical partitiondb.
// TODO(fabxc): validate partition number to be power of 2, which is required
// for the bitshift-modulo when finding the right partition.
for i := 0; i < n; i++ {
l := log.NewContext(l).With("partition", i)
d := partitionDir(dir, i)
s, err := Open(d, l)
if err != nil {
return nil, fmt.Errorf("initializing partition %q failed: %s", d, err)
}
c.Partitions = append(c.Partitions, s)
}
return c, nil
}
func partitionDir(base string, i int) string {
return filepath.Join(base, fmt.Sprintf("p-%0.4d", i))
}
// Close the database.
func (db *PartitionedDB) Close() error {
var g errgroup.Group
for _, partition := range db.Partitions {
g.Go(partition.Close)
}
return g.Wait()
}
// Appender returns a new appender against the database.
func (db *PartitionedDB) Appender() Appender {
return &partitionedAppender{
db: db,
buckets: make([][]hashedSample, len(db.Partitions)),
}
}
type partitionedAppender struct {
db *PartitionedDB
buckets [][]hashedSample
}
func (ba *partitionedAppender) Add(lset labels.Labels, t int64, v float64) error {
h := lset.Hash()
s := h >> (64 - ba.db.partitionPow)
ba.buckets[s] = append(ba.buckets[s], hashedSample{
hash: h,
labels: lset,
t: t,
v: v,
})
return nil
}
func (ba *partitionedAppender) reset() {
for i := range ba.buckets {
ba.buckets[i] = ba.buckets[i][:0]
}
}
func (ba *partitionedAppender) Commit() error {
defer ba.reset()
var merr MultiError
// Spill buckets into partitiondb.
for s, b := range ba.buckets {
merr.Add(ba.db.Partitions[s].appendBatch(b))
}
return merr.Err()
}
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
// Returns a concatenated string of the contained errors
func (es MultiError) Error() string {
var buf bytes.Buffer
if len(es) > 1 {
fmt.Fprintf(&buf, "%d errors: ", len(es))
2016-12-08 09:04:24 +00:00
}
for i, err := range es {
if i != 0 {
buf.WriteString("; ")
}
buf.WriteString(err.Error())
}
return buf.String()
2016-11-15 09:34:25 +00:00
}
2016-12-15 07:31:26 +00:00
// Add adds the error to the error list if it is not nil.
func (es *MultiError) Add(err error) {
if err == nil {
return
}
if merr, ok := err.(MultiError); ok {
*es = append(*es, merr...)
} else {
*es = append(*es, err)
2016-12-15 07:31:26 +00:00
}
}
// Err returns the error list as an error or nil if it is empty.
2016-12-15 07:31:26 +00:00
func (es MultiError) Err() error {
if len(es) == 0 {
return nil
}
return es
}
func yoloString(b []byte) string {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
h := reflect.StringHeader{
Data: sh.Data,
Len: sh.Len,
}
return *((*string)(unsafe.Pointer(&h)))
}
func yoloBytes(s string) []byte {
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
h := reflect.SliceHeader{
Cap: sh.Len,
Len: sh.Len,
Data: sh.Data,
}
return *((*[]byte)(unsafe.Pointer(&h)))
}