retrieval: cache series references, use pkg/textparse

With this change the scraping caches series references and only
allocates label sets if it has to retrieve a new reference.
pkg/textparse is used to do the conditional parsing and reduce
allocations from 900B/sample to 0 in the general case.
This commit is contained in:
Fabian Reinartz 2017-01-15 17:33:07 +01:00
parent fb3ab9bdb7
commit c691895a0f
6 changed files with 142 additions and 141 deletions

Binary file not shown.

Binary file not shown.

View File

@ -14,22 +14,22 @@
package retrieval
import (
"bytes"
"fmt"
"io"
"net/http"
"sync"
"time"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/storage"
)
@ -322,7 +322,7 @@ func (sp *scrapePool) reportAppender(target *Target) storage.Appender {
// A scraper retrieves samples and accepts a status report at the end.
type scraper interface {
scrape(ctx context.Context, ts time.Time) (samples, error)
scrape(ctx context.Context, w io.Writer) error
report(start time.Time, dur time.Duration, err error)
offset(interval time.Duration) time.Duration
}
@ -335,53 +335,41 @@ type targetScraper struct {
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (samples, error) {
var scrapeBufPool = sync.Pool{}
func getScrapeBuf() []byte {
b := scrapeBufPool.Get()
if b == nil {
return make([]byte, 0, 8192)
}
return b.([]byte)
}
func putScrapeBuf(b []byte) {
b = b[:0]
scrapeBufPool.Put(b)
}
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) error {
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return nil, err
return err
}
req.Header.Add("Accept", acceptHeader)
// Disable accept header to always negotiate for text format.
// req.Header.Add("Accept", acceptHeader)
resp, err := ctxhttp.Do(ctx, s.client, req)
if err != nil {
return nil, err
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)
return fmt.Errorf("server returned HTTP status %s", resp.Status)
}
var (
allSamples = make(samples, 0, 200)
decSamples = make(model.Vector, 0, 50)
)
sdec := expfmt.SampleDecoder{
Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)),
Opts: &expfmt.DecodeOptions{
Timestamp: model.TimeFromUnixNano(ts.UnixNano()),
},
}
for {
if err = sdec.Decode(&decSamples); err != nil {
break
}
for _, s := range decSamples {
allSamples = append(allSamples, sample{
metric: labels.FromMap(*(*map[string]string)(unsafe.Pointer(&s.Metric))),
t: int64(s.Timestamp),
v: float64(s.Value),
})
}
decSamples = decSamples[:0]
}
if err == io.EOF {
// Set err to nil since it is used in the scrape health recording.
err = nil
}
return allSamples, err
_, err = io.Copy(w, resp.Body)
return err
}
// A loop can run and be stopped again. It must not be reused after it was stopped.
@ -396,6 +384,8 @@ type scrapeLoop struct {
appender func() storage.Appender
reportAppender func() storage.Appender
cache map[string]uint64
done chan struct{}
ctx context.Context
cancel func()
@ -406,6 +396,7 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag
scraper: sc,
appender: app,
reportAppender: reportApp,
cache: map[string]uint64{},
done: make(chan struct{}),
}
sl.ctx, sl.cancel = context.WithCancel(ctx)
@ -447,14 +438,22 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
)
}
samples, err := sl.scraper.scrape(scrapeCtx, start)
n := 0
buf := bytes.NewBuffer(getScrapeBuf())
err := sl.scraper.scrape(scrapeCtx, buf)
if err == nil {
sl.append(samples)
b := buf.Bytes()
if n, err = sl.append(b, start); err != nil {
log.With("err", err).Error("append failed")
}
putScrapeBuf(b)
} else if errc != nil {
errc <- err
}
sl.report(start, time.Since(start), len(samples), err)
sl.report(start, time.Since(start), n, err)
last = start
select {
@ -491,48 +490,59 @@ func (s samples) Less(i, j int) bool {
return s[i].t < s[j].t
}
func (sl *scrapeLoop) append(samples samples) {
func (sl *scrapeLoop) append(b []byte, ts time.Time) (n int, err error) {
var (
numOutOfOrder = 0
numDuplicates = 0
app = sl.appender()
p = textparse.New(b)
defTime = timestamp.FromTime(ts)
)
app := sl.appender()
for _, s := range samples {
ref, err := app.SetSeries(s.metric)
if err != nil {
log.With("sample", s).With("error", err).Debug("Setting metric failed")
continue
for p.Next() {
t := defTime
met, tp, v := p.At()
if tp != nil {
t = *tp
}
if err := app.Add(ref, s.t, s.v); err != nil {
switch err {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
log.With("sample", s).With("error", err).Debug("Sample discarded")
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
log.With("sample", s).With("error", err).Debug("Sample discarded")
default:
log.With("sample", s).With("error", err).Warn("Sample discarded")
mets := string(met)
ref, ok := sl.cache[mets]
if ok {
if err = app.Add(ref, t, v); err != storage.ErrNotFound {
break
}
ok = false
}
if !ok {
var lset labels.Labels
p.Metric(&lset)
ref, err = app.SetSeries(lset)
if err != nil {
break
}
if err = app.Add(ref, t, v); err != nil {
break
}
}
sl.cache[mets] = ref
n++
}
if numOutOfOrder > 0 {
log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples")
if err == nil {
err = p.Err()
}
if numDuplicates > 0 {
log.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp")
if err != nil {
app.Rollback()
return 0, err
}
if err := app.Commit(); err != nil {
log.With("err", err).Warn("Error commiting scrape")
return 0, err
}
return n, nil
}
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples int, err error) {
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSamples int, err error) error {
sl.scraper.report(start, duration, err)
ts := int64(model.TimeFromUnixNano(start.UnixNano()))
ts := timestamp.FromTime(start)
var health float64
if err == nil {
@ -541,41 +551,40 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scrapedSam
app := sl.reportAppender()
var (
healthMet = labels.Labels{
labels.Label{Name: labels.MetricName, Value: scrapeHealthMetricName},
}
durationMet = labels.Labels{
labels.Label{Name: labels.MetricName, Value: scrapeDurationMetricName},
}
countMet = labels.Labels{
labels.Label{Name: labels.MetricName, Value: scrapeSamplesMetricName},
}
)
ref, err := app.SetSeries(healthMet)
if err != nil {
panic(err)
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
app.Rollback()
return err
}
if err := app.Add(ref, ts, health); err != nil {
log.With("err", err).Warn("Scrape health sample discarded")
if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil {
app.Rollback()
return err
}
ref, err = app.SetSeries(durationMet)
if err != nil {
panic(err)
}
if err := app.Add(ref, ts, duration.Seconds()); err != nil {
log.With("err", err).Warn("Scrape duration sample discarded")
}
ref, err = app.SetSeries(countMet)
if err != nil {
panic(err)
}
if err := app.Add(ref, ts, float64(scrapedSamples)); err != nil {
log.With("err", err).Warn("Scrape sample count sample discarded")
}
if err := app.Commit(); err != nil {
log.With("err", err).Warn("Commiting report samples failed")
if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scrapedSamples)); err != nil {
app.Rollback()
return err
}
return app.Commit()
}
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
ref, ok := sl.cache[s]
if ok {
if err := app.Add(ref, t, v); err != storage.ErrNotFound {
return err
}
}
met := labels.Labels{
labels.Label{Name: labels.MetricName, Value: s},
}
ref, err := app.SetSeries(met)
if err != nil {
return err
}
if err = app.Add(ref, t, v); err != nil {
return err
}
sl.cache[s] = ref
return nil
}

View File

@ -14,23 +14,25 @@
package retrieval
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/storage"
)
@ -321,9 +323,9 @@ func TestScrapeLoopStop(t *testing.T) {
}
// Running the scrape loop must exit before calling the scraper even once.
scraper.scrapeFunc = func(context.Context, time.Time) (samples, error) {
scraper.scrapeFunc = func(context.Context, io.Writer) error {
t.Fatalf("scraper was called for terminated scrape loop")
return nil, nil
return nil
}
runDone := make(chan struct{})
@ -385,13 +387,13 @@ func TestScrapeLoopRun(t *testing.T) {
scraper.offsetDur = 0
block := make(chan struct{})
scraper.scrapeFunc = func(ctx context.Context, ts time.Time) (samples, error) {
scraper.scrapeFunc = func(ctx context.Context, _ io.Writer) error {
select {
case <-block:
case <-ctx.Done():
return nil, ctx.Err()
return ctx.Err()
}
return nil, nil
return nil
}
ctx, cancel = context.WithCancel(context.Background())
@ -450,33 +452,12 @@ func TestTargetScraperScrapeOK(t *testing.T) {
},
client: http.DefaultClient,
}
now := time.Now()
var buf bytes.Buffer
smpls, err := ts.scrape(context.Background(), now)
if err != nil {
if err := ts.scrape(context.Background(), &buf); err != nil {
t.Fatalf("Unexpected scrape error: %s", err)
}
expectedSamples := samples{
sample{
metric: labels.FromStrings(labels.MetricName, "metric_a"),
t: timestamp.FromTime(now),
v: 1,
},
sample{
metric: labels.FromStrings(labels.MetricName, "metric_b"),
t: timestamp.FromTime(now),
v: 2,
},
}
sort.Sort(expectedSamples)
sort.Sort(smpls)
if !reflect.DeepEqual(smpls, expectedSamples) {
t.Errorf("Scraped samples did not match served metrics")
t.Errorf("Expected: %v", expectedSamples)
t.Fatalf("Got: %v", smpls)
}
require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
}
func TestTargetScrapeScrapeCancel(t *testing.T) {
@ -513,7 +494,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) {
}()
go func() {
if _, err := ts.scrape(ctx, time.Now()); err != context.Canceled {
if err := ts.scrape(ctx, ioutil.Discard); err != context.Canceled {
errc <- fmt.Errorf("Expected context cancelation error but got: %s", err)
}
close(errc)
@ -555,7 +536,7 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) {
client: http.DefaultClient,
}
if _, err := ts.scrape(context.Background(), time.Now()); !strings.Contains(err.Error(), "404") {
if err := ts.scrape(context.Background(), ioutil.Discard); !strings.Contains(err.Error(), "404") {
t.Fatalf("Expected \"404 NotFound\" error but got: %s", err)
}
}
@ -571,7 +552,7 @@ type testScraper struct {
samples samples
scrapeErr error
scrapeFunc func(context.Context, time.Time) (samples, error)
scrapeFunc func(context.Context, io.Writer) error
}
func (ts *testScraper) offset(interval time.Duration) time.Duration {
@ -584,9 +565,9 @@ func (ts *testScraper) report(start time.Time, duration time.Duration, err error
ts.lastError = err
}
func (ts *testScraper) scrape(ctx context.Context, t time.Time) (samples, error) {
func (ts *testScraper) scrape(ctx context.Context, w io.Writer) error {
if ts.scrapeFunc != nil {
return ts.scrapeFunc(ctx, t)
return ts.scrapeFunc(ctx, w)
}
return ts.samples, ts.scrapeErr
return ts.scrapeErr
}

View File

@ -20,6 +20,7 @@ import (
)
var (
ErrNotFound = errors.New("not found")
ErrOutOfOrderSample = errors.New("out of order sample")
ErrDuplicateSampleForTimestamp = errors.New("duplicate sample for timestamp")
)

View File

@ -78,7 +78,17 @@ func (a appender) SetSeries(lset labels.Labels) (uint64, error) {
}
func (a appender) Add(ref uint64, t int64, v float64) error {
return a.a.Add(ref, t, v)
err := a.a.Add(ref, t, v)
switch err {
case tsdb.ErrNotFound:
return storage.ErrNotFound
case tsdb.ErrOutOfOrderSample:
return storage.ErrOutOfOrderSample
case tsdb.ErrAmendSample:
return storage.ErrDuplicateSampleForTimestamp
}
return err
}
func (a appender) Commit() error { return a.a.Commit() }