Fix error on ingesting out-of-order exemplars (#13021)
Fix and improve ingesting exemplars for native histograms. See code comment for a detailed explanation of the algorithm. Note that this changes the current behavior for all kind of samples slightly: We now allow exemplars with the same timestamp as during the last scrape if the value or the labels have changed. Also note that we now do not ingest exemplars without timestamps for native histograms anymore. Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com> Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> Co-authored-by: Björn Rabenstein <github@rabenste.in> --------- Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com> Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> Signed-off-by: zenador <zenador@users.noreply.github.com> Co-authored-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com> Co-authored-by: Björn Rabenstein <github@rabenste.in>
This commit is contained in:
parent
362a0b0d14
commit
32ee1b15de
|
@ -317,22 +317,28 @@ func (p *ProtobufParser) Exemplar(ex *exemplar.Exemplar) bool {
|
|||
exProto = m.GetCounter().GetExemplar()
|
||||
case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM:
|
||||
bb := m.GetHistogram().GetBucket()
|
||||
isClassic := p.state == EntrySeries
|
||||
if p.fieldPos < 0 {
|
||||
if p.state == EntrySeries {
|
||||
if isClassic {
|
||||
return false // At _count or _sum.
|
||||
}
|
||||
p.fieldPos = 0 // Start at 1st bucket for native histograms.
|
||||
}
|
||||
for p.fieldPos < len(bb) {
|
||||
exProto = bb[p.fieldPos].GetExemplar()
|
||||
if p.state == EntrySeries {
|
||||
if isClassic {
|
||||
break
|
||||
}
|
||||
p.fieldPos++
|
||||
if exProto != nil {
|
||||
break
|
||||
// We deliberately drop exemplars with no timestamp only for native histograms.
|
||||
if exProto != nil && (isClassic || exProto.GetTimestamp() != nil) {
|
||||
break // Found a classic histogram exemplar or a native histogram exemplar with a timestamp.
|
||||
}
|
||||
}
|
||||
// If the last exemplar for native histograms has no timestamp, ignore it.
|
||||
if !isClassic && exProto.GetTimestamp() == nil {
|
||||
return false
|
||||
}
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -729,7 +729,6 @@ func TestProtobufParse(t *testing.T) {
|
|||
),
|
||||
e: []exemplar.Exemplar{
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -766,7 +765,6 @@ func TestProtobufParse(t *testing.T) {
|
|||
),
|
||||
e: []exemplar.Exemplar{
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -802,7 +800,6 @@ func TestProtobufParse(t *testing.T) {
|
|||
),
|
||||
e: []exemplar.Exemplar{
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -839,7 +836,6 @@ func TestProtobufParse(t *testing.T) {
|
|||
),
|
||||
e: []exemplar.Exemplar{
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -1233,7 +1229,6 @@ func TestProtobufParse(t *testing.T) {
|
|||
),
|
||||
e: []exemplar.Exemplar{
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false},
|
||||
},
|
||||
},
|
||||
{ // 12
|
||||
|
@ -1328,7 +1323,6 @@ func TestProtobufParse(t *testing.T) {
|
|||
),
|
||||
e: []exemplar.Exemplar{
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false},
|
||||
},
|
||||
},
|
||||
{ // 21
|
||||
|
@ -1422,7 +1416,6 @@ func TestProtobufParse(t *testing.T) {
|
|||
),
|
||||
e: []exemplar.Exemplar{
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false},
|
||||
},
|
||||
},
|
||||
{ // 30
|
||||
|
@ -1517,7 +1510,6 @@ func TestProtobufParse(t *testing.T) {
|
|||
),
|
||||
e: []exemplar.Exemplar{
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, HasTs: true, Ts: 1625851155146},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, HasTs: false},
|
||||
},
|
||||
},
|
||||
{ // 39
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"math"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -1404,6 +1405,8 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
|
|||
metadataChanged bool
|
||||
)
|
||||
|
||||
exemplars := make([]exemplar.Exemplar, 1)
|
||||
|
||||
// updateMetadata updates the current iteration's metadata object and the
|
||||
// metadataChanged value if we have metadata in the scrape cache AND the
|
||||
// labelset is for a new series or the metadata for this series has just
|
||||
|
@ -1569,18 +1572,55 @@ loop:
|
|||
// Increment added even if there's an error so we correctly report the
|
||||
// number of samples remaining after relabeling.
|
||||
added++
|
||||
|
||||
exemplars = exemplars[:0] // Reset and reuse the exemplar slice.
|
||||
for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {
|
||||
if !e.HasTs {
|
||||
if isHistogram {
|
||||
// We drop exemplars for native histograms if they don't have a timestamp.
|
||||
// Missing timestamps are deliberately not supported as we want to start
|
||||
// enforcing timestamps for exemplars as otherwise proper deduplication
|
||||
// is inefficient and purely based on heuristics: we cannot distinguish
|
||||
// between repeated exemplars and new instances with the same values.
|
||||
// This is done silently without logs as it is not an error but out of spec.
|
||||
// This does not affect classic histograms so that behaviour is unchanged.
|
||||
e = exemplar.Exemplar{} // Reset for next time round loop.
|
||||
continue
|
||||
}
|
||||
e.Ts = t
|
||||
}
|
||||
exemplars = append(exemplars, e)
|
||||
e = exemplar.Exemplar{} // Reset for next time round loop.
|
||||
}
|
||||
sort.Slice(exemplars, func(i, j int) bool {
|
||||
// Sort first by timestamp, then value, then labels so the checking
|
||||
// for duplicates / out of order is more efficient during validation.
|
||||
if exemplars[i].Ts != exemplars[j].Ts {
|
||||
return exemplars[i].Ts < exemplars[j].Ts
|
||||
}
|
||||
if exemplars[i].Value != exemplars[j].Value {
|
||||
return exemplars[i].Value < exemplars[j].Value
|
||||
}
|
||||
return exemplars[i].Labels.Hash() < exemplars[j].Labels.Hash()
|
||||
})
|
||||
outOfOrderExemplars := 0
|
||||
for _, e := range exemplars {
|
||||
_, exemplarErr := app.AppendExemplar(ref, lset, e)
|
||||
exemplarErr = sl.checkAddExemplarError(exemplarErr, e, &appErrs)
|
||||
if exemplarErr != nil {
|
||||
switch {
|
||||
case exemplarErr == nil:
|
||||
// Do nothing.
|
||||
case errors.Is(exemplarErr, storage.ErrOutOfOrderExemplar):
|
||||
outOfOrderExemplars++
|
||||
default:
|
||||
// Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors.
|
||||
level.Debug(sl.l).Log("msg", "Error while adding exemplar in AddExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
|
||||
}
|
||||
e = exemplar.Exemplar{} // reset for next time round loop
|
||||
}
|
||||
if outOfOrderExemplars > 0 && outOfOrderExemplars == len(exemplars) {
|
||||
// Only report out of order exemplars if all are out of order, otherwise this was a partial update
|
||||
// to some existing set of exemplars.
|
||||
appErrs.numExemplarOutOfOrder += outOfOrderExemplars
|
||||
level.Debug(sl.l).Log("msg", "Out of order exemplars", "count", outOfOrderExemplars, "latest", fmt.Sprintf("%+v", exemplars[len(exemplars)-1]))
|
||||
sl.metrics.targetScrapeExemplarOutOfOrder.Add(float64(outOfOrderExemplars))
|
||||
}
|
||||
|
||||
if sl.appendMetadataToWAL && metadataChanged {
|
||||
|
@ -1673,20 +1713,6 @@ func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err e
|
|||
}
|
||||
}
|
||||
|
||||
func (sl *scrapeLoop) checkAddExemplarError(err error, e exemplar.Exemplar, appErrs *appendErrors) error {
|
||||
switch {
|
||||
case errors.Is(err, storage.ErrNotFound):
|
||||
return storage.ErrNotFound
|
||||
case errors.Is(err, storage.ErrOutOfOrderExemplar):
|
||||
appErrs.numExemplarOutOfOrder++
|
||||
level.Debug(sl.l).Log("msg", "Out of order exemplar", "exemplar", fmt.Sprintf("%+v", e))
|
||||
sl.metrics.targetScrapeExemplarOutOfOrder.Inc()
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
||||
// with scraped metrics in the cache.
|
||||
var (
|
||||
|
|
|
@ -2155,7 +2155,7 @@ metric_total{n="2"} 2 # {t="2"} 2.0 20000
|
|||
},
|
||||
},
|
||||
{
|
||||
title: "Native histogram with two exemplars",
|
||||
title: "Native histogram with three exemplars",
|
||||
scrapeText: `name: "test_histogram"
|
||||
help: "Test histogram with many buckets removed to keep it manageable in size."
|
||||
type: HISTOGRAM
|
||||
|
@ -2193,6 +2193,21 @@ metric: <
|
|||
value: -0.00029
|
||||
>
|
||||
>
|
||||
bucket: <
|
||||
cumulative_count: 32
|
||||
upper_bound: -0.0001899999999999998
|
||||
exemplar: <
|
||||
label: <
|
||||
name: "dummyID"
|
||||
value: "58215"
|
||||
>
|
||||
value: -0.00019
|
||||
timestamp: <
|
||||
seconds: 1625851055
|
||||
nanos: 146848599
|
||||
>
|
||||
>
|
||||
>
|
||||
schema: 3
|
||||
zero_threshold: 2.938735877055719e-39
|
||||
zero_count: 2
|
||||
|
@ -2248,12 +2263,13 @@ metric: <
|
|||
},
|
||||
}},
|
||||
exemplars: []exemplar.Exemplar{
|
||||
// Native histogram exemplars are arranged by timestamp, and those with missing timestamps are dropped.
|
||||
{Labels: labels.FromStrings("dummyID", "58215"), Value: -0.00019, Ts: 1625851055146, HasTs: true},
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, Ts: 1234568, HasTs: false},
|
||||
},
|
||||
},
|
||||
{
|
||||
title: "Native histogram with two exemplars scraped as classic histogram",
|
||||
title: "Native histogram with three exemplars scraped as classic histogram",
|
||||
scrapeText: `name: "test_histogram"
|
||||
help: "Test histogram with many buckets removed to keep it manageable in size."
|
||||
type: HISTOGRAM
|
||||
|
@ -2291,6 +2307,21 @@ metric: <
|
|||
value: -0.00029
|
||||
>
|
||||
>
|
||||
bucket: <
|
||||
cumulative_count: 32
|
||||
upper_bound: -0.0001899999999999998
|
||||
exemplar: <
|
||||
label: <
|
||||
name: "dummyID"
|
||||
value: "58215"
|
||||
>
|
||||
value: -0.00019
|
||||
timestamp: <
|
||||
seconds: 1625851055
|
||||
nanos: 146848599
|
||||
>
|
||||
>
|
||||
>
|
||||
schema: 3
|
||||
zero_threshold: 2.938735877055719e-39
|
||||
zero_count: 2
|
||||
|
@ -2332,6 +2363,7 @@ metric: <
|
|||
{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0004899999999999998"), t: 1234568, f: 2},
|
||||
{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0003899999999999998"), t: 1234568, f: 4},
|
||||
{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0002899999999999998"), t: 1234568, f: 16},
|
||||
{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "-0.0001899999999999998"), t: 1234568, f: 32},
|
||||
{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"), t: 1234568, f: 175},
|
||||
},
|
||||
histograms: []histogramSample{{
|
||||
|
@ -2355,10 +2387,15 @@ metric: <
|
|||
},
|
||||
}},
|
||||
exemplars: []exemplar.Exemplar{
|
||||
// Native histogram one is arranged by timestamp.
|
||||
// Exemplars with missing timestamps are dropped for native histograms.
|
||||
{Labels: labels.FromStrings("dummyID", "58215"), Value: -0.00019, Ts: 1625851055146, HasTs: true},
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
|
||||
// Classic histogram one is in order of appearance.
|
||||
// Exemplars with missing timestamps are supported for classic histograms.
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, Ts: 1234568, HasTs: false},
|
||||
{Labels: labels.FromStrings("dummyID", "59727"), Value: -0.00039, Ts: 1625851155146, HasTs: true},
|
||||
{Labels: labels.FromStrings("dummyID", "5617"), Value: -0.00029, Ts: 1234568, HasTs: false},
|
||||
{Labels: labels.FromStrings("dummyID", "58215"), Value: -0.00019, Ts: 1625851055146, HasTs: true},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -245,11 +245,26 @@ func (ce *CircularExemplarStorage) validateExemplar(key []byte, e exemplar.Exemp
|
|||
|
||||
// Check for duplicate vs last stored exemplar for this series.
|
||||
// NB these are expected, and appending them is a no-op.
|
||||
if ce.exemplars[idx.newest].exemplar.Equals(e) {
|
||||
// For floats and classic histograms, there is only 1 exemplar per series,
|
||||
// so this is sufficient. For native histograms with multiple exemplars per series,
|
||||
// we have another check below.
|
||||
newestExemplar := ce.exemplars[idx.newest].exemplar
|
||||
if newestExemplar.Equals(e) {
|
||||
return storage.ErrDuplicateExemplar
|
||||
}
|
||||
|
||||
if e.Ts <= ce.exemplars[idx.newest].exemplar.Ts {
|
||||
// Since during the scrape the exemplars are sorted first by timestamp, then value, then labels,
|
||||
// if any of these conditions are true, we know that the exemplar is either a duplicate
|
||||
// of a previous one (but not the most recent one as that is checked above) or out of order.
|
||||
// We now allow exemplars with duplicate timestamps as long as they have different values and/or labels
|
||||
// since that can happen for different buckets of a native histogram.
|
||||
// We do not distinguish between duplicates and out of order as iterating through the exemplars
|
||||
// to check for that would be expensive (versus just comparing with the most recent one) especially
|
||||
// since this is run under a lock, and not worth it as we just need to return an error so we do not
|
||||
// append the exemplar.
|
||||
if e.Ts < newestExemplar.Ts ||
|
||||
(e.Ts == newestExemplar.Ts && e.Value < newestExemplar.Value) ||
|
||||
(e.Ts == newestExemplar.Ts && e.Value == newestExemplar.Value && e.Labels.Hash() < newestExemplar.Labels.Hash()) {
|
||||
if appended {
|
||||
ce.metrics.outOfOrderExemplars.Inc()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue