From 80545bfb2eb8f9deeedc442130f7c4dc34525d8d Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 16 Apr 2021 14:44:53 +0200 Subject: [PATCH] Instrumented circular exemplar storage. (#8712) * Instrumented circular storage. Fixes: https://github.com/prometheus/prometheus/issues/8708 Fixes: https://github.com/prometheus/prometheus/issues/8707 Signed-off-by: Bartlomiej Plotka * Fixed CB. Signed-off-by: Bartlomiej Plotka * Addressed Julien comments. Signed-off-by: Bartlomiej Plotka * Addressed Callum comments. Signed-off-by: Bartlomiej Plotka --- cmd/prometheus/main.go | 2 +- tsdb/exemplar.go | 132 +++++++++++++++++++++++++---------------- tsdb/exemplar_test.go | 12 ++-- tsdb/head.go | 19 +++--- 4 files changed, 95 insertions(+), 70 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 0492fbe15..9fbd6e76b 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -151,7 +151,7 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { level.Info(logger).Log("msg", "Experimental expand-external-labels enabled") case "exemplar-storage": c.tsdb.MaxExemplars = maxExemplars - level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled") + level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled", "maxExemplars", maxExemplars) case "": continue default: diff --git a/tsdb/exemplar.go b/tsdb/exemplar.go index 5a42f30a8..e6e6cb494 100644 --- a/tsdb/exemplar.go +++ b/tsdb/exemplar.go @@ -25,7 +25,11 @@ import ( ) type CircularExemplarStorage struct { - outOfOrderExemplars prometheus.Counter + exemplarsAppended prometheus.Counter + exemplarsInStorage prometheus.Gauge + seriesWithExemplarsInStorage prometheus.Gauge + lastExemplarsTs prometheus.Gauge + outOfOrderExemplars prometheus.Counter lock sync.RWMutex exemplars []*circularBufferEntry @@ -37,8 +41,8 @@ type CircularExemplarStorage struct { } type indexEntry struct { - first int - last int + oldest int + newest int } type circularBufferEntry struct { @@ -47,6 +51,7 @@ type circularBufferEntry struct { next int } +// NewCircularExemplarStorage creates an circular in memory exemplar storage. // If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in // 1GB of extra memory, accounting for the fact that this is heap allocated space. // If len < 1, then the exemplar storage is disabled. @@ -57,14 +62,37 @@ func NewCircularExemplarStorage(len int, reg prometheus.Registerer) (ExemplarSto c := &CircularExemplarStorage{ exemplars: make([]*circularBufferEntry, len), index: make(map[string]*indexEntry), + exemplarsAppended: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_exemplar_exemplars_appended_total", + Help: "Total number of appended exemplars.", + }), + exemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_exemplar_exemplars_in_storage", + Help: "Number of exemplars currently in circular storage.", + }), + seriesWithExemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_exemplar_series_with_exemplars_in_storage", + Help: "Number of series with exemplars currently in circular storage.", + }), + lastExemplarsTs: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_exemplar_last_exemplars_timestamp_seconds", + Help: "The timestamp of the oldest exemplar stored in circular storage. Useful to check for what time" + + "range the current exemplar buffer limit allows. This usually means the last timestamp" + + "for all exemplars for a typical setup. This is not true though if one of the series timestamp is in future compared to rest series.", + }), outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total", Help: "Total number of out of order exemplar ingestion failed attempts", }), } - if reg != nil { - reg.MustRegister(c.outOfOrderExemplars) + reg.MustRegister( + c.exemplarsAppended, + c.exemplarsInStorage, + c.seriesWithExemplarsInStorage, + c.lastExemplarsTs, + c.outOfOrderExemplars, + ) } return c, nil @@ -78,7 +106,7 @@ func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.E return ce, nil } -func (ce *CircularExemplarStorage) Querier(ctx context.Context) (storage.ExemplarQuerier, error) { +func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQuerier, error) { return ce, nil } @@ -92,7 +120,7 @@ func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*label // Loop through each index entry, which will point us to first/last exemplar for each series. for _, idx := range ce.index { var se exemplar.QueryResult - e := ce.exemplars[idx.first] + e := ce.exemplars[idx.oldest] if !matchesSomeMatcherSet(e.seriesLabels, matchers) { continue } @@ -133,62 +161,66 @@ Outer: return false } -// indexGc takes the circularBufferEntry that will be overwritten and updates the -// storages index for that entries labelset if necessary. -func (ce *CircularExemplarStorage) indexGc(cbe *circularBufferEntry) { - if cbe == nil { - return - } - - l := cbe.seriesLabels.String() - i := cbe.next - if i == -1 { - delete(ce.index, l) - return - } - - ce.index[l] = &indexEntry{i, ce.index[l].last} -} - func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { seriesLabels := l.String() + + // TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale. + // Optimize by moving the lock to be per series (& benchmark it). ce.lock.Lock() defer ce.lock.Unlock() idx, ok := ce.index[seriesLabels] if !ok { - ce.indexGc(ce.exemplars[ce.nextIndex]) - // Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select) - // since this is the first exemplar stored for this series. - ce.exemplars[ce.nextIndex] = &circularBufferEntry{ - exemplar: e, - seriesLabels: l, - next: -1} - ce.index[seriesLabels] = &indexEntry{ce.nextIndex, ce.nextIndex} - ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) - return nil + ce.index[seriesLabels] = &indexEntry{oldest: ce.nextIndex} + } else { + // Check for duplicate vs last stored exemplar for this series. + // NB these are expected, add appending them is a no-op. + if ce.exemplars[idx.newest].exemplar.Equals(e) { + return nil + } + + if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts { + ce.outOfOrderExemplars.Inc() + return storage.ErrOutOfOrderExemplar + } + + ce.exemplars[ce.index[seriesLabels].newest].next = ce.nextIndex } - // Check for duplicate vs last stored exemplar for this series. - // NB these are expected, add appending them is a no-op. - if ce.exemplars[idx.last].exemplar.Equals(e) { - return nil + if prev := ce.exemplars[ce.nextIndex]; prev == nil { + ce.exemplars[ce.nextIndex] = &circularBufferEntry{} + } else { + // There exists exemplar already on this ce.nextIndex entry, drop it, to make place + // for others. + prevLabels := prev.seriesLabels.String() + if prev.next == -1 { + // Last item for this series, remove index entry. + delete(ce.index, prevLabels) + } else { + ce.index[prevLabels].oldest = prev.next + } } - if e.Ts <= ce.exemplars[idx.last].exemplar.Ts { - ce.outOfOrderExemplars.Inc() - return storage.ErrOutOfOrderExemplar - } - ce.indexGc(ce.exemplars[ce.nextIndex]) - ce.exemplars[ce.nextIndex] = &circularBufferEntry{ - exemplar: e, - seriesLabels: l, - next: -1, - } + // Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select) + // since this is the first exemplar stored for this series. + ce.exemplars[ce.nextIndex].exemplar = e + ce.exemplars[ce.nextIndex].next = -1 + ce.exemplars[ce.nextIndex].seriesLabels = l + ce.index[seriesLabels].newest = ce.nextIndex - ce.exemplars[ce.index[seriesLabels].last].next = ce.nextIndex - ce.index[seriesLabels].last = ce.nextIndex ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) + + ce.exemplarsAppended.Inc() + ce.seriesWithExemplarsInStorage.Set(float64(len(ce.index))) + if next := ce.exemplars[ce.nextIndex]; next != nil { + ce.exemplarsInStorage.Set(float64(len(ce.exemplars))) + ce.lastExemplarsTs.Set(float64(next.exemplar.Ts)) + return nil + } + + // We did not yet fill the buffer. + ce.exemplarsInStorage.Set(float64(ce.nextIndex)) + ce.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts)) return nil } diff --git a/tsdb/exemplar_test.go b/tsdb/exemplar_test.go index bf76502ff..76976f8be 100644 --- a/tsdb/exemplar_test.go +++ b/tsdb/exemplar_test.go @@ -46,7 +46,7 @@ func TestAddExemplar(t *testing.T) { err = es.AddExemplar(l, e) require.NoError(t, err) - require.Equal(t, es.index[l.String()].last, 0, "exemplar was not stored correctly") + require.Equal(t, es.index[l.String()].newest, 0, "exemplar was not stored correctly") e2 := exemplar.Exemplar{ Labels: labels.Labels{ @@ -61,8 +61,8 @@ func TestAddExemplar(t *testing.T) { err = es.AddExemplar(l, e2) require.NoError(t, err) - require.Equal(t, es.index[l.String()].last, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update") - require.True(t, es.exemplars[es.index[l.String()].last].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[l.String()].last].exemplar) + require.Equal(t, es.index[l.String()].newest, 1, "exemplar was not stored correctly, location of newest exemplar for series in index did not update") + require.True(t, es.exemplars[es.index[l.String()].newest].exemplar.Equals(e2), "exemplar was not stored correctly, expected %+v got: %+v", e2, es.exemplars[es.index[l.String()].newest].exemplar) err = es.AddExemplar(l, e2) require.NoError(t, err, "no error is expected attempting to add duplicate exemplar") @@ -121,9 +121,7 @@ func TestSelectExemplar(t *testing.T) { require.NoError(t, err) es := exs.(*CircularExemplarStorage) - l := labels.Labels{ - {Name: "service", Value: "asdf"}, - } + l := labels.Labels{{Name: "service", Value: "asdf"}} e := exemplar.Exemplar{ Labels: labels.Labels{ labels.Label{ @@ -228,7 +226,7 @@ func TestSelectExemplar_TimeRange(t *testing.T) { Ts: int64(101 + i), }) require.NoError(t, err) - require.Equal(t, es.index[l.String()].last, i, "exemplar was not stored correctly") + require.Equal(t, es.index[l.String()].newest, i, "exemplar was not stored correctly") } m, err := labels.NewMatcher(labels.MatchEqual, l[0].Name, l[0].Value) diff --git a/tsdb/head.go b/tsdb/head.go index 353797b60..342aa312a 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -145,7 +145,6 @@ type headMetrics struct { samplesAppended prometheus.Counter outOfBoundSamples prometheus.Counter outOfOrderSamples prometheus.Counter - outOfOrderExemplars prometheus.Counter walTruncateDuration prometheus.Summary walCorruptionsTotal prometheus.Counter walTotalReplayDuration prometheus.Gauge @@ -222,10 +221,6 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_out_of_order_samples_total", Help: "Total number of out of order samples ingestion failed attempts.", }), - outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_out_of_order_exemplars_total", - Help: "Total number of out of order exemplars ingestion failed attempts.", - }), headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_truncations_failed_total", Help: "Total number of head truncations that failed.", @@ -273,7 +268,6 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.samplesAppended, m.outOfBoundSamples, m.outOfOrderSamples, - m.outOfOrderExemplars, m.headTruncateFail, m.headTruncateTotal, m.checkpointDeleteFail, @@ -1387,19 +1381,20 @@ func (a *headAppender) Commit() (err error) { } defer func() { a.closed = true }() if err := a.log(); err != nil { - //nolint: errcheck - a.Rollback() // Most likely the same error will happen again. + _ = a.Rollback() // Most likely the same error will happen again. return errors.Wrap(err, "write to WAL") } // No errors logging to WAL, so pass the exemplars along to the in memory storage. for _, e := range a.exemplars { s := a.head.series.getByID(e.ref) - err := a.exemplarAppender.AddExemplar(s.lset, e.exemplar) - if err == storage.ErrOutOfOrderExemplar { - a.head.metrics.outOfOrderExemplars.Inc() - } else if err != nil { + // We don't instrument exemplar appends here, all is instrumented by storage. + if err := a.exemplarAppender.AddExemplar(s.lset, e.exemplar); err != nil { + if err == storage.ErrOutOfOrderExemplar { + continue + } level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err) + continue } }