storage: extend appender and adapt it

This commit is contained in:
Fabian Reinartz 2017-01-13 14:48:01 +01:00
parent 304cae9928
commit ad9bc62e4c
10 changed files with 135 additions and 59 deletions

View File

@ -84,12 +84,6 @@ func Main() int {
return 1
}
sampleAppender, err := localStorage.Appender()
if err != nil {
log.Errorf("Creating sample appender failed: %s", err)
return 1
}
// reloadableRemoteStorage := remote.New()
// sampleAppender = append(sampleAppender, reloadableRemoteStorage)
// reloadables = append(reloadables, reloadableRemoteStorage)
@ -102,11 +96,11 @@ func Main() int {
)
ruleManager := rules.NewManager(&rules.ManagerOptions{
SampleAppender: sampleAppender,
Notifier: notifier,
QueryEngine: queryEngine,
Context: ctx,
ExternalURL: cfg.web.ExternalURL,
Appendable: localStorage,
Notifier: notifier,
QueryEngine: queryEngine,
Context: ctx,
ExternalURL: cfg.web.ExternalURL,
})
cfg.web.Context = ctx

View File

@ -14,6 +14,7 @@
package promql
import (
"fmt"
"math"
"regexp"
"sort"
@ -724,6 +725,8 @@ func funcHistogramQuantile(ev *evaluator, args Expressions) Value {
q := ev.evalFloat(args[0])
inVec := ev.evalVector(args[1])
fmt.Println("invec", inVec)
outVec := Vector{}
signatureToMetricWithBuckets := map[uint64]*metricWithBuckets{}
for _, el := range inVec {

View File

@ -280,14 +280,21 @@ func (cmd *loadCmd) set(m labels.Labels, vals ...sequenceValue) {
}
// append the defined time series to the storage.
func (cmd *loadCmd) append(a storage.Appender) {
func (cmd *loadCmd) append(a storage.Appender) error {
for h, smpls := range cmd.defs {
m := cmd.metrics[h]
for _, s := range smpls {
a.Add(m, s.T, s.V)
ref, err := a.SetSeries(m)
if err != nil {
return err
}
if err := a.Add(ref, s.T, s.V); err != nil {
return err
}
}
}
return nil
}
// evalCmd is a command that evaluates an expression for the given time (range)
@ -473,7 +480,10 @@ func (t *Test) exec(tc testCommand) error {
if err != nil {
return err
}
cmd.append(app)
if err := cmd.append(app); err != nil {
app.Rollback()
return err
}
if err := app.Commit(); err != nil {
return err

View File

@ -27,31 +27,41 @@ func (a nopAppendable) Appender() (storage.Appender, error) {
type nopAppender struct{}
func (a nopAppender) Add(l labels.Labels, t int64, v float64) error { return nil }
func (a nopAppender) Commit() error { return nil }
func (a nopAppender) SetSeries(labels.Labels) (uint64, error) { return 0, nil }
func (a nopAppender) Add(uint64, int64, float64) error { return nil }
func (a nopAppender) Commit() error { return nil }
func (a nopAppender) Rollback() error { return nil }
type collectResultAppender struct {
result []sample
throttled bool
refs map[uint64]labels.Labels
result []sample
}
func (a *collectResultAppender) Add(l labels.Labels, t int64, v float64) error {
func (a *collectResultAppender) SetSeries(l labels.Labels) (uint64, error) {
if a.refs == nil {
a.refs = map[uint64]labels.Labels{}
}
ref := uint64(len(a.refs))
a.refs[ref] = l
return ref, nil
}
func (a *collectResultAppender) Add(ref uint64, t int64, v float64) error {
// for ln, lv := range s.Metric {
// if len(lv) == 0 {
// delete(s.Metric, ln)
// }
// }
a.result = append(a.result, sample{
metric: l,
metric: a.refs[ref],
t: t,
v: v,
})
return nil
}
func (a *collectResultAppender) Commit() error {
return nil
}
func (a *collectResultAppender) Commit() error { return nil }
func (a *collectResultAppender) Rollback() error { return nil }
// fakeTargetProvider implements a TargetProvider and allows manual injection
// of TargetGroups through the update channel.

View File

@ -102,7 +102,7 @@ type scrapePool struct {
loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, storage.Appender, storage.Appender) loop
newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender) loop
}
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable) *scrapePool {
@ -171,7 +171,14 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
var (
t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client}
newLoop = sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t))
newLoop = sp.newLoop(sp.ctx, s,
func() storage.Appender {
return sp.sampleAppender(t)
},
func() storage.Appender {
return sp.reportAppender(t)
},
)
)
wg.Add(1)
@ -227,12 +234,20 @@ func (sp *scrapePool) sync(targets []*Target) {
)
for _, t := range targets {
t := t
hash := t.hash()
uniqueTargets[hash] = struct{}{}
if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client}
l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t))
l := sp.newLoop(sp.ctx, s,
func() storage.Appender {
return sp.sampleAppender(t)
},
func() storage.Appender {
return sp.reportAppender(t)
},
)
sp.targets[hash] = t
sp.loops[hash] = l
@ -378,15 +393,15 @@ type loop interface {
type scrapeLoop struct {
scraper scraper
appender storage.Appender
reportAppender storage.Appender
appender func() storage.Appender
reportAppender func() storage.Appender
done chan struct{}
ctx context.Context
cancel func()
}
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.Appender) loop {
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender) loop {
sl := &scrapeLoop{
scraper: sc,
appender: app,
@ -481,9 +496,15 @@ func (sl *scrapeLoop) append(samples samples) {
numOutOfOrder = 0
numDuplicates = 0
)
app := sl.appender()
for _, s := range samples {
if err := sl.appender.Add(s.metric, s.t, s.v); err != nil {
ref, err := app.SetSeries(s.metric)
if err != nil {
log.With("sample", s).With("error", err).Debug("Setting metric failed")
continue
}
if err := app.Add(ref, s.t, s.v); err != nil {
switch err {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
@ -503,7 +524,7 @@ func (sl *scrapeLoop) append(samples samples) {
log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
}
if err := sl.appender.Commit(); err != nil {
if err := app.Commit(); err != nil {
log.With("err", err).Warn("Error commiting scrape")
}
}
@ -518,6 +539,8 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam
health = 1
}
app := sl.reportAppender()
var (
healthMet = labels.Labels{
labels.Label{Name: labels.MetricName, Value: scrapeHealthMetricName},
@ -530,17 +553,29 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam
}
)
if err := sl.reportAppender.Add(healthMet, ts, health); err != nil {
ref, err := app.SetSeries(healthMet)
if err != nil {
panic(err)
}
if err := app.Add(ref, ts, health); err != nil {
log.With("err", err).Warn("Scrape health sample discarded")
}
if err := sl.reportAppender.Add(durationMet, ts, duration.Seconds()); err != nil {
ref, err = app.SetSeries(durationMet)
if err != nil {
panic(err)
}
if err := app.Add(ref, ts, duration.Seconds()); err != nil {
log.With("err", err).Warn("Scrape duration sample discarded")
}
if err := sl.reportAppender.Add(countMet, ts, float64(scrapedSamples)); err != nil {
ref, err = app.SetSeries(countMet)
if err != nil {
panic(err)
}
if err := app.Add(ref, ts, float64(scrapedSamples)); err != nil {
log.With("err", err).Warn("Scrape sample count sample discarded")
}
if err := sl.reportAppender.Commit(); err != nil {
if err := app.Commit(); err != nil {
log.With("err", err).Warn("Commiting report samples failed")
}
}

View File

@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) {
}
// On starting to run, new loops created on reload check whether their preceding
// equivalents have been stopped.
newLoop := func(ctx context.Context, s scraper, app, reportApp storage.Appender) loop {
newLoop := func(ctx context.Context, s scraper, app, reportApp func() storage.Appender) loop {
l := &testLoop{}
l.startFunc = func(interval, timeout time.Duration, errc chan<- error) {
if interval != 3*time.Second {
@ -351,8 +351,8 @@ func TestScrapeLoopRun(t *testing.T) {
errc = make(chan error)
scraper = &testScraper{}
app = &nopAppender{}
reportApp = &nopAppender{}
app = func() storage.Appender { return &nopAppender{} }
reportApp = func() storage.Appender { return &nopAppender{} }
)
defer close(signal)

View File

@ -14,6 +14,7 @@
package retrieval
import (
"errors"
"fmt"
"hash/fnv"
"io/ioutil"
@ -233,7 +234,7 @@ type ruleLabelsAppender struct {
labels labels.Labels
}
func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) error {
func (app ruleLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) {
lb := labels.NewBuilder(lset)
for _, l := range app.labels {
@ -244,7 +245,7 @@ func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) error
lb.Set(l.Name, l.Value)
}
return app.Appender.Add(lb.Labels(), t, v)
return app.Appender.SetSeries(lb.Labels())
}
type honorLabelsAppender struct {
@ -255,7 +256,7 @@ type honorLabelsAppender struct {
// Merges the sample's metric with the given labels if the label is not
// already present in the metric.
// This also considers labels explicitly set to the empty string.
func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) error {
func (app honorLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) {
lb := labels.NewBuilder(lset)
for _, l := range app.labels {
@ -263,8 +264,7 @@ func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) error
lb.Set(l.Name, l.Value)
}
}
return app.Appender.Add(lb.Labels(), t, v)
return app.Appender.SetSeries(lb.Labels())
}
// Applies a set of relabel configurations to the sample's metric
@ -274,14 +274,14 @@ type relabelAppender struct {
relabelings []*config.RelabelConfig
}
func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) error {
lset = relabel.Process(lset, app.relabelings...)
var errSeriesDropped = errors.New("series dropped")
// Check if the timeseries was dropped.
func (app relabelAppender) SetSeries(lset labels.Labels) (uint64, error) {
lset = relabel.Process(lset, app.relabelings...)
if lset == nil {
return nil
return 0, errSeriesDropped
}
return app.Appender.Add(lset, t, v)
return app.Appender.SetSeries(lset)
}
// populateLabels builds a label set from the given label set and scrape configuration.

View File

@ -275,8 +275,19 @@ func (g *Group) Eval() {
numDuplicates = 0
)
app, err := g.opts.Appendable.Appender()
if err != nil {
log.With("err", err).Warn("creating appender failed")
return
}
for _, s := range vector {
if err := g.opts.SampleAppender.Add(s.Metric, s.T, s.V); err != nil {
ref, err := app.SetSeries(s.Metric)
if err != nil {
log.With("sample", s).With("error", err).Warn("Setting metric failed")
continue
}
if err := app.Add(ref, s.T, s.V); err != nil {
switch err {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
@ -295,7 +306,7 @@ func (g *Group) Eval() {
if numDuplicates > 0 {
log.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp")
}
if err := g.opts.SampleAppender.Commit(); err != nil {
if err := app.Commit(); err != nil {
log.With("err", err).Warn("rule sample appending failed")
}
}(rule)
@ -341,13 +352,17 @@ type Manager struct {
block chan struct{}
}
type Appendable interface {
Appender() (storage.Appender, error)
}
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
ExternalURL *url.URL
QueryEngine *promql.Engine
Context context.Context
Notifier *notifier.Notifier
SampleAppender storage.Appender
ExternalURL *url.URL
QueryEngine *promql.Engine
Context context.Context
Notifier *notifier.Notifier
Appendable Appendable
}
// NewManager returns an implementation of Manager, ready to be started

View File

@ -51,11 +51,15 @@ type Querier interface {
// Appender provides batched appends against a storage.
type Appender interface {
SetSeries(labels.Labels) (uint64, error)
// Add adds a sample pair for the referenced series.
Add(lset labels.Labels, t int64, v float64) error
Add(ref uint64, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error
Rollback() error
}
// SeriesSet contains a set of series.

View File

@ -73,11 +73,16 @@ type appender struct {
a tsdb.Appender
}
func (a appender) Add(lset labels.Labels, t int64, v float64) error {
return a.a.Add(toTSDBLabels(lset), t, v)
func (a appender) SetSeries(lset labels.Labels) (uint64, error) {
return a.a.SetSeries(toTSDBLabels(lset))
}
func (a appender) Commit() error { return a.a.Commit() }
func (a appender) Add(ref uint64, t int64, v float64) error {
return a.a.Add(ref, t, v)
}
func (a appender) Commit() error { return a.a.Commit() }
func (a appender) Rollback() error { return a.a.Rollback() }
func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher {
switch m.Type {