1052 lines
27 KiB
Go
1052 lines
27 KiB
Go
// Copyright 2016 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package retrieval
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/version"
|
|
"golang.org/x/net/context/ctxhttp"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/pkg/labels"
|
|
"github.com/prometheus/prometheus/pkg/pool"
|
|
"github.com/prometheus/prometheus/pkg/relabel"
|
|
"github.com/prometheus/prometheus/pkg/textparse"
|
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
|
"github.com/prometheus/prometheus/pkg/value"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/util/httputil"
|
|
)
|
|
|
|
var (
|
|
targetIntervalLength = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Name: "prometheus_target_interval_length_seconds",
|
|
Help: "Actual intervals between scrapes.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
},
|
|
[]string{"interval"},
|
|
)
|
|
targetReloadIntervalLength = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Name: "prometheus_target_reload_length_seconds",
|
|
Help: "Actual interval to reload the scrape pool with a given configuration.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
},
|
|
[]string{"interval"},
|
|
)
|
|
targetSyncIntervalLength = prometheus.NewSummaryVec(
|
|
prometheus.SummaryOpts{
|
|
Name: "prometheus_target_sync_length_seconds",
|
|
Help: "Actual interval to sync the scrape pool.",
|
|
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
|
|
},
|
|
[]string{"scrape_job"},
|
|
)
|
|
targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrape_pool_sync_total",
|
|
Help: "Total number of syncs that were executed on a scrape pool.",
|
|
},
|
|
[]string{"scrape_job"},
|
|
)
|
|
targetScrapeSampleLimit = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrapes_exceeded_sample_limit_total",
|
|
Help: "Total number of scrapes that hit the sample limit and were rejected.",
|
|
},
|
|
)
|
|
targetScrapeSampleDuplicate = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrapes_sample_duplicate_timestamp_total",
|
|
Help: "Total number of samples rejected due to duplicate timestamps but different values",
|
|
},
|
|
)
|
|
targetScrapeSampleOutOfOrder = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrapes_sample_out_of_order_total",
|
|
Help: "Total number of samples rejected due to not being out of the expected order",
|
|
},
|
|
)
|
|
targetScrapeSampleOutOfBounds = prometheus.NewCounter(
|
|
prometheus.CounterOpts{
|
|
Name: "prometheus_target_scrapes_sample_out_of_bounds_total",
|
|
Help: "Total number of samples rejected due to timestamp falling outside of the time bounds",
|
|
},
|
|
)
|
|
)
|
|
|
|
func init() {
|
|
prometheus.MustRegister(targetIntervalLength)
|
|
prometheus.MustRegister(targetReloadIntervalLength)
|
|
prometheus.MustRegister(targetSyncIntervalLength)
|
|
prometheus.MustRegister(targetScrapePoolSyncsCounter)
|
|
prometheus.MustRegister(targetScrapeSampleLimit)
|
|
prometheus.MustRegister(targetScrapeSampleDuplicate)
|
|
prometheus.MustRegister(targetScrapeSampleOutOfOrder)
|
|
prometheus.MustRegister(targetScrapeSampleOutOfBounds)
|
|
}
|
|
|
|
// scrapePool manages scrapes for sets of targets.
|
|
type scrapePool struct {
|
|
appendable Appendable
|
|
logger log.Logger
|
|
|
|
mtx sync.RWMutex
|
|
config *config.ScrapeConfig
|
|
client *http.Client
|
|
// Targets and loops must always be synchronized to have the same
|
|
// set of hashes.
|
|
targets map[uint64]*Target
|
|
droppedTargets []*Target
|
|
loops map[uint64]loop
|
|
cancel context.CancelFunc
|
|
|
|
// Constructor for new scrape loops. This is settable for testing convenience.
|
|
newLoop func(*Target, scraper) loop
|
|
}
|
|
|
|
const maxAheadTime = 10 * time.Minute
|
|
|
|
type labelsMutator func(labels.Labels) labels.Labels
|
|
|
|
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
|
|
if logger == nil {
|
|
logger = log.NewNopLogger()
|
|
}
|
|
|
|
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
|
|
if err != nil {
|
|
// Any errors that could occur here should be caught during config validation.
|
|
level.Error(logger).Log("msg", "Error creating HTTP client", "err", err)
|
|
}
|
|
|
|
buffers := pool.NewBytesPool(163, 100e6, 3)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
sp := &scrapePool{
|
|
cancel: cancel,
|
|
appendable: app,
|
|
config: cfg,
|
|
client: client,
|
|
targets: map[uint64]*Target{},
|
|
loops: map[uint64]loop{},
|
|
logger: logger,
|
|
}
|
|
sp.newLoop = func(t *Target, s scraper) loop {
|
|
return newScrapeLoop(
|
|
ctx,
|
|
s,
|
|
log.With(logger, "target", t),
|
|
buffers,
|
|
func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) },
|
|
func(l labels.Labels) labels.Labels { return sp.mutateReportSampleLabels(l, t) },
|
|
sp.appender,
|
|
)
|
|
}
|
|
|
|
return sp
|
|
}
|
|
|
|
// stop terminates all scrape loops and returns after they all terminated.
|
|
func (sp *scrapePool) stop() {
|
|
sp.cancel()
|
|
var wg sync.WaitGroup
|
|
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
|
|
for fp, l := range sp.loops {
|
|
wg.Add(1)
|
|
|
|
go func(l loop) {
|
|
l.stop()
|
|
wg.Done()
|
|
}(l)
|
|
|
|
delete(sp.loops, fp)
|
|
delete(sp.targets, fp)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// reload the scrape pool with the given scrape configuration. The target state is preserved
|
|
// but all scrape loops are restarted with the new scrape configuration.
|
|
// This method returns after all scrape loops that were stopped have stopped scraping.
|
|
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
|
|
start := time.Now()
|
|
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
|
|
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
|
|
if err != nil {
|
|
// Any errors that could occur here should be caught during config validation.
|
|
level.Error(sp.logger).Log("msg", "Error creating HTTP client", "err", err)
|
|
}
|
|
sp.config = cfg
|
|
sp.client = client
|
|
|
|
var (
|
|
wg sync.WaitGroup
|
|
interval = time.Duration(sp.config.ScrapeInterval)
|
|
timeout = time.Duration(sp.config.ScrapeTimeout)
|
|
)
|
|
|
|
for fp, oldLoop := range sp.loops {
|
|
var (
|
|
t = sp.targets[fp]
|
|
s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
|
|
newLoop = sp.newLoop(t, s)
|
|
)
|
|
wg.Add(1)
|
|
|
|
go func(oldLoop, newLoop loop) {
|
|
oldLoop.stop()
|
|
wg.Done()
|
|
|
|
go newLoop.run(interval, timeout, nil)
|
|
}(oldLoop, newLoop)
|
|
|
|
sp.loops[fp] = newLoop
|
|
}
|
|
|
|
wg.Wait()
|
|
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
|
|
time.Since(start).Seconds(),
|
|
)
|
|
}
|
|
|
|
// Sync converts target groups into actual scrape targets and synchronizes
|
|
// the currently running scraper with the resulting set.
|
|
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
|
start := time.Now()
|
|
|
|
var all []*Target
|
|
for _, tg := range tgs {
|
|
targets, err := targetsFromGroup(tg, sp.config)
|
|
if err != nil {
|
|
level.Error(sp.logger).Log("msg", "creating targets failed", "err", err)
|
|
continue
|
|
}
|
|
for _, t := range targets {
|
|
if t.Labels().Len() > 0 {
|
|
all = append(all, t)
|
|
} else if t.DiscoveredLabels().Len() > 0 {
|
|
sp.droppedTargets = append(sp.droppedTargets, t)
|
|
}
|
|
}
|
|
}
|
|
sp.sync(all)
|
|
|
|
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
|
time.Since(start).Seconds(),
|
|
)
|
|
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
|
|
}
|
|
|
|
// sync takes a list of potentially duplicated targets, deduplicates them, starts
|
|
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
|
// It returns after all stopped scrape loops terminated.
|
|
func (sp *scrapePool) sync(targets []*Target) {
|
|
sp.mtx.Lock()
|
|
defer sp.mtx.Unlock()
|
|
|
|
var (
|
|
uniqueTargets = map[uint64]struct{}{}
|
|
interval = time.Duration(sp.config.ScrapeInterval)
|
|
timeout = time.Duration(sp.config.ScrapeTimeout)
|
|
)
|
|
|
|
for _, t := range targets {
|
|
t := t
|
|
hash := t.hash()
|
|
uniqueTargets[hash] = struct{}{}
|
|
|
|
if _, ok := sp.targets[hash]; !ok {
|
|
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
|
|
l := sp.newLoop(t, s)
|
|
|
|
sp.targets[hash] = t
|
|
sp.loops[hash] = l
|
|
|
|
go l.run(interval, timeout, nil)
|
|
}
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
// Stop and remove old targets and scraper loops.
|
|
for hash := range sp.targets {
|
|
if _, ok := uniqueTargets[hash]; !ok {
|
|
wg.Add(1)
|
|
go func(l loop) {
|
|
l.stop()
|
|
wg.Done()
|
|
}(sp.loops[hash])
|
|
|
|
delete(sp.loops, hash)
|
|
delete(sp.targets, hash)
|
|
}
|
|
}
|
|
|
|
// Wait for all potentially stopped scrapers to terminate.
|
|
// This covers the case of flapping targets. If the server is under high load, a new scraper
|
|
// may be active and tries to insert. The old scraper that didn't terminate yet could still
|
|
// be inserting a previous sample set.
|
|
wg.Wait()
|
|
}
|
|
|
|
func (sp *scrapePool) mutateSampleLabels(lset labels.Labels, target *Target) labels.Labels {
|
|
lb := labels.NewBuilder(lset)
|
|
|
|
if sp.config.HonorLabels {
|
|
for _, l := range target.Labels() {
|
|
if lv := lset.Get(l.Name); lv == "" {
|
|
lb.Set(l.Name, l.Value)
|
|
}
|
|
}
|
|
} else {
|
|
for _, l := range target.Labels() {
|
|
lv := lset.Get(l.Name)
|
|
if lv != "" {
|
|
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
|
|
}
|
|
lb.Set(l.Name, l.Value)
|
|
}
|
|
}
|
|
|
|
res := lb.Labels()
|
|
|
|
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
|
|
res = relabel.Process(res, mrc...)
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func (sp *scrapePool) mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels {
|
|
lb := labels.NewBuilder(lset)
|
|
|
|
for _, l := range target.Labels() {
|
|
lv := lset.Get(l.Name)
|
|
if lv != "" {
|
|
lb.Set(model.ExportedLabelPrefix+l.Name, lv)
|
|
}
|
|
lb.Set(l.Name, l.Value)
|
|
}
|
|
|
|
return lb.Labels()
|
|
}
|
|
|
|
// appender returns an appender for ingested samples from the target.
|
|
func (sp *scrapePool) appender() storage.Appender {
|
|
app, err := sp.appendable.Appender()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
app = &timeLimitAppender{
|
|
Appender: app,
|
|
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
|
}
|
|
|
|
// The limit is applied after metrics are potentially dropped via relabeling.
|
|
if sp.config.SampleLimit > 0 {
|
|
app = &limitAppender{
|
|
Appender: app,
|
|
limit: int(sp.config.SampleLimit),
|
|
}
|
|
}
|
|
return app
|
|
}
|
|
|
|
// A scraper retrieves samples and accepts a status report at the end.
|
|
type scraper interface {
|
|
scrape(ctx context.Context, w io.Writer) error
|
|
report(start time.Time, dur time.Duration, err error)
|
|
offset(interval time.Duration) time.Duration
|
|
}
|
|
|
|
// targetScraper implements the scraper interface for a target.
|
|
type targetScraper struct {
|
|
*Target
|
|
|
|
client *http.Client
|
|
req *http.Request
|
|
timeout time.Duration
|
|
|
|
gzipr *gzip.Reader
|
|
buf *bufio.Reader
|
|
}
|
|
|
|
const acceptHeader = `text/plain;version=0.0.4;q=1,*/*;q=0.1`
|
|
|
|
var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version)
|
|
|
|
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) error {
|
|
if s.req == nil {
|
|
req, err := http.NewRequest("GET", s.URL().String(), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Add("Accept", acceptHeader)
|
|
req.Header.Add("Accept-Encoding", "gzip")
|
|
req.Header.Set("User-Agent", userAgentHeader)
|
|
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))
|
|
|
|
s.req = req
|
|
}
|
|
|
|
resp, err := ctxhttp.Do(ctx, s.client, s.req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("server returned HTTP status %s", resp.Status)
|
|
}
|
|
|
|
if resp.Header.Get("Content-Encoding") != "gzip" {
|
|
_, err = io.Copy(w, resp.Body)
|
|
return err
|
|
}
|
|
|
|
if s.gzipr == nil {
|
|
s.buf = bufio.NewReader(resp.Body)
|
|
s.gzipr, err = gzip.NewReader(s.buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
s.buf.Reset(resp.Body)
|
|
s.gzipr.Reset(s.buf)
|
|
}
|
|
|
|
_, err = io.Copy(w, s.gzipr)
|
|
s.gzipr.Close()
|
|
return err
|
|
}
|
|
|
|
// A loop can run and be stopped again. It must not be reused after it was stopped.
|
|
type loop interface {
|
|
run(interval, timeout time.Duration, errc chan<- error)
|
|
stop()
|
|
}
|
|
|
|
type cacheEntry struct {
|
|
ref uint64
|
|
lastIter uint64
|
|
hash uint64
|
|
lset labels.Labels
|
|
}
|
|
|
|
type scrapeLoop struct {
|
|
scraper scraper
|
|
l log.Logger
|
|
cache *scrapeCache
|
|
lastScrapeSize int
|
|
buffers *pool.BytesPool
|
|
|
|
appender func() storage.Appender
|
|
sampleMutator labelsMutator
|
|
reportSampleMutator labelsMutator
|
|
|
|
ctx context.Context
|
|
scrapeCtx context.Context
|
|
cancel func()
|
|
stopped chan struct{}
|
|
}
|
|
|
|
// scrapeCache tracks mappings of exposed metric strings to label sets and
|
|
// storage references. Additionally, it tracks staleness of series between
|
|
// scrapes.
|
|
type scrapeCache struct {
|
|
iter uint64 // Current scrape iteration.
|
|
|
|
// Parsed string to an entry with information about the actual label set
|
|
// and its storage reference.
|
|
entries map[string]*cacheEntry
|
|
|
|
// Cache of dropped metric strings and their iteration. The iteration must
|
|
// be a pointer so we can update it without setting a new entry with an unsafe
|
|
// string in addDropped().
|
|
dropped map[string]*uint64
|
|
|
|
// seriesCur and seriesPrev store the labels of series that were seen
|
|
// in the current and previous scrape.
|
|
// We hold two maps and swap them out to save allocations.
|
|
seriesCur map[uint64]labels.Labels
|
|
seriesPrev map[uint64]labels.Labels
|
|
}
|
|
|
|
func newScrapeCache() *scrapeCache {
|
|
return &scrapeCache{
|
|
entries: map[string]*cacheEntry{},
|
|
dropped: map[string]*uint64{},
|
|
seriesCur: map[uint64]labels.Labels{},
|
|
seriesPrev: map[uint64]labels.Labels{},
|
|
}
|
|
}
|
|
|
|
func (c *scrapeCache) iterDone() {
|
|
// refCache and lsetCache may grow over time through series churn
|
|
// or multiple string representations of the same metric. Clean up entries
|
|
// that haven't appeared in the last scrape.
|
|
for s, e := range c.entries {
|
|
if c.iter-e.lastIter > 2 {
|
|
delete(c.entries, s)
|
|
}
|
|
}
|
|
for s, iter := range c.dropped {
|
|
if c.iter-*iter > 2 {
|
|
delete(c.dropped, s)
|
|
}
|
|
}
|
|
|
|
// Swap current and previous series.
|
|
c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev
|
|
|
|
// We have to delete every single key in the map.
|
|
for k := range c.seriesCur {
|
|
delete(c.seriesCur, k)
|
|
}
|
|
|
|
c.iter++
|
|
}
|
|
|
|
func (c *scrapeCache) get(met string) (*cacheEntry, bool) {
|
|
e, ok := c.entries[met]
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
e.lastIter = c.iter
|
|
return e, true
|
|
}
|
|
|
|
func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) {
|
|
if ref == 0 {
|
|
return
|
|
}
|
|
c.entries[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
|
|
}
|
|
|
|
func (c *scrapeCache) addDropped(met string) {
|
|
iter := c.iter
|
|
c.dropped[met] = &iter
|
|
}
|
|
|
|
func (c *scrapeCache) getDropped(met string) bool {
|
|
iterp, ok := c.dropped[met]
|
|
if ok {
|
|
*iterp = c.iter
|
|
}
|
|
return ok
|
|
}
|
|
|
|
func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {
|
|
c.seriesCur[hash] = lset
|
|
}
|
|
|
|
func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
|
|
for h, lset := range c.seriesPrev {
|
|
if _, ok := c.seriesCur[h]; !ok {
|
|
if !f(lset) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func newScrapeLoop(ctx context.Context,
|
|
sc scraper,
|
|
l log.Logger,
|
|
buffers *pool.BytesPool,
|
|
sampleMutator labelsMutator,
|
|
reportSampleMutator labelsMutator,
|
|
appender func() storage.Appender,
|
|
) *scrapeLoop {
|
|
if l == nil {
|
|
l = log.NewNopLogger()
|
|
}
|
|
if buffers == nil {
|
|
buffers = pool.NewBytesPool(1e3, 1e6, 3)
|
|
}
|
|
sl := &scrapeLoop{
|
|
scraper: sc,
|
|
buffers: buffers,
|
|
cache: newScrapeCache(),
|
|
appender: appender,
|
|
sampleMutator: sampleMutator,
|
|
reportSampleMutator: reportSampleMutator,
|
|
stopped: make(chan struct{}),
|
|
l: l,
|
|
ctx: ctx,
|
|
}
|
|
sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)
|
|
|
|
return sl
|
|
}
|
|
|
|
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
|
|
select {
|
|
case <-time.After(sl.scraper.offset(interval)):
|
|
// Continue after a scraping offset.
|
|
case <-sl.scrapeCtx.Done():
|
|
close(sl.stopped)
|
|
return
|
|
}
|
|
|
|
var last time.Time
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
buf := bytes.NewBuffer(make([]byte, 0, 16000))
|
|
|
|
mainLoop:
|
|
for {
|
|
buf.Reset()
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
close(sl.stopped)
|
|
return
|
|
case <-sl.scrapeCtx.Done():
|
|
break mainLoop
|
|
default:
|
|
}
|
|
|
|
var (
|
|
start = time.Now()
|
|
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
|
|
)
|
|
|
|
// Only record after the first scrape.
|
|
if !last.IsZero() {
|
|
targetIntervalLength.WithLabelValues(interval.String()).Observe(
|
|
time.Since(last).Seconds(),
|
|
)
|
|
}
|
|
b := sl.buffers.Get(sl.lastScrapeSize)
|
|
buf := bytes.NewBuffer(b)
|
|
|
|
scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
|
|
cancel()
|
|
|
|
if scrapeErr == nil {
|
|
b = buf.Bytes()
|
|
// NOTE: There were issues with misbehaving clients in the past
|
|
// that occasionally returned empty results. We don't want those
|
|
// to falsely reset our buffer size.
|
|
if len(b) > 0 {
|
|
sl.lastScrapeSize = len(b)
|
|
}
|
|
} else {
|
|
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
|
|
if errc != nil {
|
|
errc <- scrapeErr
|
|
}
|
|
}
|
|
|
|
// A failed scrape is the same as an empty scrape,
|
|
// we still call sl.append to trigger stale markers.
|
|
total, added, appErr := sl.append(b, start)
|
|
if appErr != nil {
|
|
level.Warn(sl.l).Log("msg", "append failed", "err", appErr)
|
|
// The append failed, probably due to a parse error or sample limit.
|
|
// Call sl.append again with an empty scrape to trigger stale markers.
|
|
if _, _, err := sl.append([]byte{}, start); err != nil {
|
|
level.Warn(sl.l).Log("msg", "append failed", "err", err)
|
|
}
|
|
}
|
|
|
|
sl.buffers.Put(b)
|
|
|
|
if scrapeErr == nil {
|
|
scrapeErr = appErr
|
|
}
|
|
|
|
sl.report(start, time.Since(start), total, added, scrapeErr)
|
|
last = start
|
|
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
close(sl.stopped)
|
|
return
|
|
case <-sl.scrapeCtx.Done():
|
|
break mainLoop
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
|
|
close(sl.stopped)
|
|
|
|
sl.endOfRunStaleness(last, ticker, interval)
|
|
}
|
|
|
|
func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) {
|
|
// Scraping has stopped. We want to write stale markers but
|
|
// the target may be recreated, so we wait just over 2 scrape intervals
|
|
// before creating them.
|
|
// If the context is cancelled, we presume the server is shutting down
|
|
// and will restart where is was. We do not attempt to write stale markers
|
|
// in this case.
|
|
|
|
if last.IsZero() {
|
|
// There never was a scrape, so there will be no stale markers.
|
|
return
|
|
}
|
|
|
|
// Wait for when the next scrape would have been, record its timestamp.
|
|
var staleTime time.Time
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
staleTime = time.Now()
|
|
}
|
|
|
|
// Wait for when the next scrape would have been, if the target was recreated
|
|
// samples should have been ingested by now.
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
|
|
// Wait for an extra 10% of the interval, just to be safe.
|
|
select {
|
|
case <-sl.ctx.Done():
|
|
return
|
|
case <-time.After(interval / 10):
|
|
}
|
|
|
|
// Call sl.append again with an empty scrape to trigger stale markers.
|
|
// If the target has since been recreated and scraped, the
|
|
// stale markers will be out of order and ignored.
|
|
if _, _, err := sl.append([]byte{}, staleTime); err != nil {
|
|
level.Error(sl.l).Log("msg", "stale append failed", "err", err)
|
|
}
|
|
if err := sl.reportStale(staleTime); err != nil {
|
|
level.Error(sl.l).Log("msg", "stale report failed", "err", err)
|
|
}
|
|
}
|
|
|
|
// Stop the scraping. May still write data and stale markers after it has
|
|
// returned. Cancel the context to stop all writes.
|
|
func (sl *scrapeLoop) stop() {
|
|
sl.cancel()
|
|
<-sl.stopped
|
|
}
|
|
|
|
type sample struct {
|
|
metric labels.Labels
|
|
t int64
|
|
v float64
|
|
}
|
|
|
|
type samples []sample
|
|
|
|
func (s samples) Len() int { return len(s) }
|
|
func (s samples) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|
|
|
func (s samples) Less(i, j int) bool {
|
|
d := labels.Compare(s[i].metric, s[j].metric)
|
|
if d < 0 {
|
|
return true
|
|
} else if d > 0 {
|
|
return false
|
|
}
|
|
return s[i].t < s[j].t
|
|
}
|
|
|
|
func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err error) {
|
|
var (
|
|
app = sl.appender()
|
|
p = textparse.New(b)
|
|
defTime = timestamp.FromTime(ts)
|
|
numOutOfOrder = 0
|
|
numDuplicates = 0
|
|
numOutOfBounds = 0
|
|
)
|
|
var sampleLimitErr error
|
|
|
|
loop:
|
|
for p.Next() {
|
|
total++
|
|
|
|
t := defTime
|
|
met, tp, v := p.At()
|
|
if tp != nil {
|
|
t = *tp
|
|
}
|
|
|
|
if sl.cache.getDropped(yoloString(met)) {
|
|
continue
|
|
}
|
|
ce, ok := sl.cache.get(yoloString(met))
|
|
if ok {
|
|
switch err = app.AddFast(ce.lset, ce.ref, t, v); err {
|
|
case nil:
|
|
if tp == nil {
|
|
sl.cache.trackStaleness(ce.hash, ce.lset)
|
|
}
|
|
case storage.ErrNotFound:
|
|
ok = false
|
|
case storage.ErrOutOfOrderSample:
|
|
numOutOfOrder++
|
|
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
|
|
targetScrapeSampleOutOfOrder.Inc()
|
|
continue
|
|
case storage.ErrDuplicateSampleForTimestamp:
|
|
numDuplicates++
|
|
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
|
|
targetScrapeSampleDuplicate.Inc()
|
|
continue
|
|
case storage.ErrOutOfBounds:
|
|
numOutOfBounds++
|
|
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
|
|
targetScrapeSampleOutOfBounds.Inc()
|
|
continue
|
|
case errSampleLimit:
|
|
// Keep on parsing output if we hit the limit, so we report the correct
|
|
// total number of samples scraped.
|
|
sampleLimitErr = err
|
|
added++
|
|
continue
|
|
default:
|
|
break loop
|
|
}
|
|
}
|
|
if !ok {
|
|
var lset labels.Labels
|
|
|
|
mets := p.Metric(&lset)
|
|
hash := lset.Hash()
|
|
|
|
// Hash label set as it is seen local to the target. Then add target labels
|
|
// and relabeling and store the final label set.
|
|
lset = sl.sampleMutator(lset)
|
|
|
|
// The label set may be set to nil to indicate dropping.
|
|
if lset == nil {
|
|
sl.cache.addDropped(mets)
|
|
continue
|
|
}
|
|
|
|
var ref uint64
|
|
ref, err = app.Add(lset, t, v)
|
|
// TODO(fabxc): also add a dropped-cache?
|
|
switch err {
|
|
case nil:
|
|
case storage.ErrOutOfOrderSample:
|
|
err = nil
|
|
numOutOfOrder++
|
|
level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))
|
|
targetScrapeSampleOutOfOrder.Inc()
|
|
continue
|
|
case storage.ErrDuplicateSampleForTimestamp:
|
|
err = nil
|
|
numDuplicates++
|
|
level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))
|
|
targetScrapeSampleDuplicate.Inc()
|
|
continue
|
|
case storage.ErrOutOfBounds:
|
|
err = nil
|
|
numOutOfBounds++
|
|
level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))
|
|
targetScrapeSampleOutOfBounds.Inc()
|
|
continue
|
|
case errSampleLimit:
|
|
sampleLimitErr = err
|
|
added++
|
|
continue
|
|
default:
|
|
level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err)
|
|
break loop
|
|
}
|
|
if tp == nil {
|
|
// Bypass staleness logic if there is an explicit timestamp.
|
|
sl.cache.trackStaleness(hash, lset)
|
|
}
|
|
sl.cache.addRef(mets, ref, lset, hash)
|
|
}
|
|
added++
|
|
}
|
|
if err == nil {
|
|
err = p.Err()
|
|
}
|
|
if err == nil && sampleLimitErr != nil {
|
|
targetScrapeSampleLimit.Inc()
|
|
err = sampleLimitErr
|
|
}
|
|
if numOutOfOrder > 0 {
|
|
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder)
|
|
}
|
|
if numDuplicates > 0 {
|
|
level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates)
|
|
}
|
|
if numOutOfBounds > 0 {
|
|
level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds)
|
|
}
|
|
if err == nil {
|
|
sl.cache.forEachStale(func(lset labels.Labels) bool {
|
|
// Series no longer exposed, mark it stale.
|
|
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
|
|
switch err {
|
|
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
|
// Do not count these in logging, as this is expected if a target
|
|
// goes away and comes back again with a new scrape loop.
|
|
err = nil
|
|
}
|
|
return err == nil
|
|
})
|
|
}
|
|
if err != nil {
|
|
app.Rollback()
|
|
return total, added, err
|
|
}
|
|
if err := app.Commit(); err != nil {
|
|
return total, added, err
|
|
}
|
|
|
|
sl.cache.iterDone()
|
|
|
|
return total, added, nil
|
|
}
|
|
|
|
func yoloString(b []byte) string {
|
|
return *((*string)(unsafe.Pointer(&b)))
|
|
}
|
|
|
|
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
|
// with scraped metrics in the cache.
|
|
const (
|
|
scrapeHealthMetricName = "up" + "\xff"
|
|
scrapeDurationMetricName = "scrape_duration_seconds" + "\xff"
|
|
scrapeSamplesMetricName = "scrape_samples_scraped" + "\xff"
|
|
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling" + "\xff"
|
|
)
|
|
|
|
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error {
|
|
sl.scraper.report(start, duration, err)
|
|
|
|
ts := timestamp.FromTime(start)
|
|
|
|
var health float64
|
|
if err == nil {
|
|
health = 1
|
|
}
|
|
app := sl.appender()
|
|
|
|
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
return app.Commit()
|
|
}
|
|
|
|
func (sl *scrapeLoop) reportStale(start time.Time) error {
|
|
ts := timestamp.FromTime(start)
|
|
app := sl.appender()
|
|
|
|
stale := math.Float64frombits(value.StaleNaN)
|
|
|
|
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, stale); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, scrapeDurationMetricName, ts, stale); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, stale); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, stale); err != nil {
|
|
app.Rollback()
|
|
return err
|
|
}
|
|
return app.Commit()
|
|
}
|
|
|
|
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
|
|
ce, ok := sl.cache.get(s)
|
|
if ok {
|
|
err := app.AddFast(ce.lset, ce.ref, t, v)
|
|
switch err {
|
|
case nil:
|
|
return nil
|
|
case storage.ErrNotFound:
|
|
// Try an Add.
|
|
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
|
// Do not log here, as this is expected if a target goes away and comes back
|
|
// again with a new scrape loop.
|
|
return nil
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
lset := labels.Labels{
|
|
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
|
// with scraped metrics in the cache.
|
|
// We have to drop it when building the actual metric.
|
|
labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]},
|
|
}
|
|
|
|
hash := lset.Hash()
|
|
lset = sl.reportSampleMutator(lset)
|
|
|
|
ref, err := app.Add(lset, t, v)
|
|
switch err {
|
|
case nil:
|
|
sl.cache.addRef(s, ref, lset, hash)
|
|
return nil
|
|
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
|
|
return nil
|
|
default:
|
|
return err
|
|
}
|
|
}
|