diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index acdf7acd9..0f5b21e45 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -84,12 +84,6 @@ func Main() int { return 1 } - sampleAppender, err := localStorage.Appender() - if err != nil { - log.Errorf("Creating sample appender failed: %s", err) - return 1 - } - // reloadableRemoteStorage := remote.New() // sampleAppender = append(sampleAppender, reloadableRemoteStorage) // reloadables = append(reloadables, reloadableRemoteStorage) @@ -102,11 +96,11 @@ func Main() int { ) ruleManager := rules.NewManager(&rules.ManagerOptions{ - SampleAppender: sampleAppender, - Notifier: notifier, - QueryEngine: queryEngine, - Context: ctx, - ExternalURL: cfg.web.ExternalURL, + Appendable: localStorage, + Notifier: notifier, + QueryEngine: queryEngine, + Context: ctx, + ExternalURL: cfg.web.ExternalURL, }) cfg.web.Context = ctx diff --git a/promql/functions.go b/promql/functions.go index 6aca50601..b66e008c9 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -14,6 +14,7 @@ package promql import ( + "fmt" "math" "regexp" "sort" @@ -724,6 +725,8 @@ func funcHistogramQuantile(ev *evaluator, args Expressions) Value { q := ev.evalFloat(args[0]) inVec := ev.evalVector(args[1]) + fmt.Println("invec", inVec) + outVec := Vector{} signatureToMetricWithBuckets := map[uint64]*metricWithBuckets{} for _, el := range inVec { diff --git a/promql/test.go b/promql/test.go index 8db392d62..72f0f272d 100644 --- a/promql/test.go +++ b/promql/test.go @@ -280,14 +280,21 @@ func (cmd *loadCmd) set(m labels.Labels, vals ...sequenceValue) { } // append the defined time series to the storage. -func (cmd *loadCmd) append(a storage.Appender) { +func (cmd *loadCmd) append(a storage.Appender) error { for h, smpls := range cmd.defs { m := cmd.metrics[h] for _, s := range smpls { - a.Add(m, s.T, s.V) + ref, err := a.SetSeries(m) + if err != nil { + return err + } + if err := a.Add(ref, s.T, s.V); err != nil { + return err + } } } + return nil } // evalCmd is a command that evaluates an expression for the given time (range) @@ -473,7 +480,10 @@ func (t *Test) exec(tc testCommand) error { if err != nil { return err } - cmd.append(app) + if err := cmd.append(app); err != nil { + app.Rollback() + return err + } if err := app.Commit(); err != nil { return err diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index bce1d51c3..c5f1c68e9 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -27,31 +27,41 @@ func (a nopAppendable) Appender() (storage.Appender, error) { type nopAppender struct{} -func (a nopAppender) Add(l labels.Labels, t int64, v float64) error { return nil } -func (a nopAppender) Commit() error { return nil } +func (a nopAppender) SetSeries(labels.Labels) (uint64, error) { return 0, nil } +func (a nopAppender) Add(uint64, int64, float64) error { return nil } +func (a nopAppender) Commit() error { return nil } +func (a nopAppender) Rollback() error { return nil } type collectResultAppender struct { - result []sample - throttled bool + refs map[uint64]labels.Labels + result []sample } -func (a *collectResultAppender) Add(l labels.Labels, t int64, v float64) error { +func (a *collectResultAppender) SetSeries(l labels.Labels) (uint64, error) { + if a.refs == nil { + a.refs = map[uint64]labels.Labels{} + } + ref := uint64(len(a.refs)) + a.refs[ref] = l + return ref, nil +} + +func (a *collectResultAppender) Add(ref uint64, t int64, v float64) error { // for ln, lv := range s.Metric { // if len(lv) == 0 { // delete(s.Metric, ln) // } // } a.result = append(a.result, sample{ - metric: l, + metric: a.refs[ref], t: t, v: v, }) return nil } -func (a *collectResultAppender) Commit() error { - return nil -} +func (a *collectResultAppender) Commit() error { return nil } +func (a *collectResultAppender) Rollback() error { return nil } // fakeTargetProvider implements a TargetProvider and allows manual injection // of TargetGroups through the update channel. diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 0f9970431..e41f615fb 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -102,7 +102,7 @@ type scrapePool struct { loops map[uint64]loop // Constructor for new scrape loops. This is settable for testing convenience. - newLoop func(context.Context, scraper, storage.Appender, storage.Appender) loop + newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender) loop } func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable) *scrapePool { @@ -171,7 +171,14 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { var ( t = sp.targets[fp] s = &targetScraper{Target: t, client: sp.client} - newLoop = sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) + newLoop = sp.newLoop(sp.ctx, s, + func() storage.Appender { + return sp.sampleAppender(t) + }, + func() storage.Appender { + return sp.reportAppender(t) + }, + ) ) wg.Add(1) @@ -227,12 +234,20 @@ func (sp *scrapePool) sync(targets []*Target) { ) for _, t := range targets { + t := t hash := t.hash() uniqueTargets[hash] = struct{}{} if _, ok := sp.targets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client} - l := sp.newLoop(sp.ctx, s, sp.sampleAppender(t), sp.reportAppender(t)) + l := sp.newLoop(sp.ctx, s, + func() storage.Appender { + return sp.sampleAppender(t) + }, + func() storage.Appender { + return sp.reportAppender(t) + }, + ) sp.targets[hash] = t sp.loops[hash] = l @@ -378,15 +393,15 @@ type loop interface { type scrapeLoop struct { scraper scraper - appender storage.Appender - reportAppender storage.Appender + appender func() storage.Appender + reportAppender func() storage.Appender done chan struct{} ctx context.Context cancel func() } -func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp storage.Appender) loop { +func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender) loop { sl := &scrapeLoop{ scraper: sc, appender: app, @@ -481,9 +496,15 @@ func (sl *scrapeLoop) append(samples samples) { numOutOfOrder = 0 numDuplicates = 0 ) + app := sl.appender() for _, s := range samples { - if err := sl.appender.Add(s.metric, s.t, s.v); err != nil { + ref, err := app.SetSeries(s.metric) + if err != nil { + log.With("sample", s).With("error", err).Debug("Setting metric failed") + continue + } + if err := app.Add(ref, s.t, s.v); err != nil { switch err { case storage.ErrOutOfOrderSample: numOutOfOrder++ @@ -503,7 +524,7 @@ func (sl *scrapeLoop) append(samples samples) { log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") } - if err := sl.appender.Commit(); err != nil { + if err := app.Commit(); err != nil { log.With("err", err).Warn("Error commiting scrape") } } @@ -518,6 +539,8 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam health = 1 } + app := sl.reportAppender() + var ( healthMet = labels.Labels{ labels.Label{Name: labels.MetricName, Value: scrapeHealthMetricName}, @@ -530,17 +553,29 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam } ) - if err := sl.reportAppender.Add(healthMet, ts, health); err != nil { + ref, err := app.SetSeries(healthMet) + if err != nil { + panic(err) + } + if err := app.Add(ref, ts, health); err != nil { log.With("err", err).Warn("Scrape health sample discarded") } - if err := sl.reportAppender.Add(durationMet, ts, duration.Seconds()); err != nil { + ref, err = app.SetSeries(durationMet) + if err != nil { + panic(err) + } + if err := app.Add(ref, ts, duration.Seconds()); err != nil { log.With("err", err).Warn("Scrape duration sample discarded") } - if err := sl.reportAppender.Add(countMet, ts, float64(scrapedSamples)); err != nil { + ref, err = app.SetSeries(countMet) + if err != nil { + panic(err) + } + if err := app.Add(ref, ts, float64(scrapedSamples)); err != nil { log.With("err", err).Warn("Scrape sample count sample discarded") } - if err := sl.reportAppender.Commit(); err != nil { + if err := app.Commit(); err != nil { log.With("err", err).Warn("Commiting report samples failed") } } diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 788fc5d7f..1ff14254b 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -139,7 +139,7 @@ func TestScrapePoolReload(t *testing.T) { } // On starting to run, new loops created on reload check whether their preceding // equivalents have been stopped. - newLoop := func(ctx context.Context, s scraper, app, reportApp storage.Appender) loop { + newLoop := func(ctx context.Context, s scraper, app, reportApp func() storage.Appender) loop { l := &testLoop{} l.startFunc = func(interval, timeout time.Duration, errc chan<- error) { if interval != 3*time.Second { @@ -351,8 +351,8 @@ func TestScrapeLoopRun(t *testing.T) { errc = make(chan error) scraper = &testScraper{} - app = &nopAppender{} - reportApp = &nopAppender{} + app = func() storage.Appender { return &nopAppender{} } + reportApp = func() storage.Appender { return &nopAppender{} } ) defer close(signal) diff --git a/retrieval/target.go b/retrieval/target.go index 4d93ad845..ccf8c12d9 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -14,6 +14,7 @@ package retrieval import ( + "errors" "fmt" "hash/fnv" "io/ioutil" @@ -233,7 +234,7 @@ type ruleLabelsAppender struct { labels labels.Labels } -func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) error { +func (app ruleLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) { lb := labels.NewBuilder(lset) for _, l := range app.labels { @@ -244,7 +245,7 @@ func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) error lb.Set(l.Name, l.Value) } - return app.Appender.Add(lb.Labels(), t, v) + return app.Appender.SetSeries(lb.Labels()) } type honorLabelsAppender struct { @@ -255,7 +256,7 @@ type honorLabelsAppender struct { // Merges the sample's metric with the given labels if the label is not // already present in the metric. // This also considers labels explicitly set to the empty string. -func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) error { +func (app honorLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) { lb := labels.NewBuilder(lset) for _, l := range app.labels { @@ -263,8 +264,7 @@ func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) error lb.Set(l.Name, l.Value) } } - - return app.Appender.Add(lb.Labels(), t, v) + return app.Appender.SetSeries(lb.Labels()) } // Applies a set of relabel configurations to the sample's metric @@ -274,14 +274,14 @@ type relabelAppender struct { relabelings []*config.RelabelConfig } -func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) error { - lset = relabel.Process(lset, app.relabelings...) +var errSeriesDropped = errors.New("series dropped") - // Check if the timeseries was dropped. +func (app relabelAppender) SetSeries(lset labels.Labels) (uint64, error) { + lset = relabel.Process(lset, app.relabelings...) if lset == nil { - return nil + return 0, errSeriesDropped } - return app.Appender.Add(lset, t, v) + return app.Appender.SetSeries(lset) } // populateLabels builds a label set from the given label set and scrape configuration. diff --git a/rules/manager.go b/rules/manager.go index 7ad433d2f..5c3ebbfb2 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -275,8 +275,19 @@ func (g *Group) Eval() { numDuplicates = 0 ) + app, err := g.opts.Appendable.Appender() + if err != nil { + log.With("err", err).Warn("creating appender failed") + return + } + for _, s := range vector { - if err := g.opts.SampleAppender.Add(s.Metric, s.T, s.V); err != nil { + ref, err := app.SetSeries(s.Metric) + if err != nil { + log.With("sample", s).With("error", err).Warn("Setting metric failed") + continue + } + if err := app.Add(ref, s.T, s.V); err != nil { switch err { case storage.ErrOutOfOrderSample: numOutOfOrder++ @@ -295,7 +306,7 @@ func (g *Group) Eval() { if numDuplicates > 0 { log.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp") } - if err := g.opts.SampleAppender.Commit(); err != nil { + if err := app.Commit(); err != nil { log.With("err", err).Warn("rule sample appending failed") } }(rule) @@ -341,13 +352,17 @@ type Manager struct { block chan struct{} } +type Appendable interface { + Appender() (storage.Appender, error) +} + // ManagerOptions bundles options for the Manager. type ManagerOptions struct { - ExternalURL *url.URL - QueryEngine *promql.Engine - Context context.Context - Notifier *notifier.Notifier - SampleAppender storage.Appender + ExternalURL *url.URL + QueryEngine *promql.Engine + Context context.Context + Notifier *notifier.Notifier + Appendable Appendable } // NewManager returns an implementation of Manager, ready to be started diff --git a/storage/interface.go b/storage/interface.go index 1f6a782ec..e35b00f92 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -51,11 +51,15 @@ type Querier interface { // Appender provides batched appends against a storage. type Appender interface { + SetSeries(labels.Labels) (uint64, error) + // Add adds a sample pair for the referenced series. - Add(lset labels.Labels, t int64, v float64) error + Add(ref uint64, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error + + Rollback() error } // SeriesSet contains a set of series. diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index f0240be09..f24d5ac31 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -73,11 +73,16 @@ type appender struct { a tsdb.Appender } -func (a appender) Add(lset labels.Labels, t int64, v float64) error { - return a.a.Add(toTSDBLabels(lset), t, v) +func (a appender) SetSeries(lset labels.Labels) (uint64, error) { + return a.a.SetSeries(toTSDBLabels(lset)) } -func (a appender) Commit() error { return a.a.Commit() } +func (a appender) Add(ref uint64, t int64, v float64) error { + return a.a.Add(ref, t, v) +} + +func (a appender) Commit() error { return a.a.Commit() } +func (a appender) Rollback() error { return a.a.Rollback() } func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher { switch m.Type {